diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-14 18:02:22 +0200 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2021-09-14 18:02:22 +0200 |
| commit | 04a06535716ba3dc7fb19b20de6766fa89881242 (patch) | |
| tree | 7a491a70923f29fbe7b1ab27cdf4b52c446804fd | |
| parent | 9725595c002865bf28d3b182658656096152d5b1 (diff) | |
| download | rabbitmq-server-git-04a06535716ba3dc7fb19b20de6766fa89881242.tar.gz | |
Sort stream partitions using binding parameter
If present. To make the partition order stable.
4 files changed, 109 insertions, 5 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 0be72d714f..4e202ed5b7 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -386,6 +386,9 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> %% FIXME make sure queue is a stream %% TODO bindings could be sorted by partition number, by using a binding argument %% this would make the spreading of messages stable + UnorderedBindings = rabbit_binding:list_for_source(ExchangeName), + OrderedBindings = + rabbit_stream_utils:sort_partitions(UnorderedBindings), {ok, lists:foldl(fun (#binding{destination = #resource{kind = queue, name = Q}}, @@ -394,7 +397,7 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) -> (_Binding, Acc) -> Acc end, - [], rabbit_binding:list_for_source(ExchangeName))} + [], OrderedBindings)} catch exit:Error -> rabbit_log:error("Error while looking up exchange ~p, ~p", diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 303bf22cb1..5b4acc5f3e 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -174,7 +174,8 @@ callback_mode() -> terminate(Reason, State, #statem_data{transport = Transport, connection = #stream_connection{socket = Socket}, - connection_state = ConnectionState} = StatemData) -> + connection_state = ConnectionState} = + StatemData) -> close(Transport, Socket, ConnectionState), rabbit_networking:unregister_non_amqp_connection(self()), notify_connection_closed(StatemData), @@ -1053,7 +1054,8 @@ close_sent(enter, _OldState, StateTimeout}}) -> {keep_state_and_data, {state_timeout, StateTimeout, close}}; close_sent(state_timeout, close, #statem_data{}) -> - rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.", + rabbit_log_connection:warning("Closing connection because of timeout in state " + "'~s' likely due to lack of client action.", [?FUNCTION_NAME]), stop; close_sent(info, {tcp, S, Data}, @@ -1081,7 +1083,8 @@ close_sent(info, {tcp_closed, S}, _StatemData) -> [S, self()]), stop; close_sent(info, {tcp_error, S, Reason}, #statem_data{}) -> - rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]", + rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] " + "[~w]", [Reason, S, self()]), stop; close_sent(info, {resource_alarm, IsThereAlarm}, diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl index 1e44dc10ce..92d0bff8af 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl @@ -25,10 +25,13 @@ check_configure_permitted/3, check_write_permitted/3, check_read_permitted/3, - extract_stream_list/2]). + extract_stream_list/2, + sort_partitions/1]). -define(MAX_PERMISSION_CACHE_SIZE, 12). +-include_lib("rabbit_common/include/rabbit.hrl"). + enforce_correct_stream_name(Name) -> % from rabbit_channel StrippedName = @@ -213,3 +216,23 @@ extract_stream_list(<<>>, Streams) -> extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>, Streams) -> extract_stream_list(Rest, [Stream | Streams]). + +-spec sort_partitions([#binding{}]) -> [#binding{}]. +sort_partitions(Partitions) -> + lists:sort(fun(#binding{args = Args1}, #binding{args = Args2}) -> + Arg1 = + rabbit_misc:table_lookup(Args1, + <<"x-stream-partition-order">>), + Arg2 = + rabbit_misc:table_lookup(Args2, + <<"x-stream-partition-order">>), + case {Arg1, Arg2} of + {{_, Order1}, {_, Order2}} -> + rabbit_data_coercion:to_integer(Order1) + =< rabbit_data_coercion:to_integer(Order2); + {undefined, {_, _Order2}} -> false; + {{_, _Order1}, undefined} -> true; + _ -> true + end + end, + Partitions). diff --git a/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl new file mode 100644 index 0000000000..49704a8672 --- /dev/null +++ b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl @@ -0,0 +1,75 @@ +-module(rabbit_stream_utils_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [{group, tests}]. + +suite() -> + [{timetrap, {seconds, 30}}]. + +groups() -> + [{tests, [], [sort_partitions]}]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +group(_GroupName) -> + []. + +init_per_group(_GroupName, Config) -> + Config. + +end_per_group(_GroupName, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +sort_partitions(_Config) -> + [] = rabbit_stream_utils:sort_partitions([]), + %[binding(<<"a">>, 1), binding(<<"b">>, 1), + % binding(<<"c">>, 2)] + ?assertEqual([<<"a">>, <<"b">>, <<"c">>], + [S + || #binding{destination = #resource{name = S}} + <- rabbit_stream_utils:sort_partitions([binding(<<"c">>, + 2), + binding(<<"b">>, + 1), + binding(<<"a">>, + 0)])]), + ?assertEqual([<<"a">>, <<"c">>, <<"not-order-field">>], + [S + || #binding{destination = #resource{name = S}} + <- rabbit_stream_utils:sort_partitions([binding(<<"c">>, + 10), + binding(<<"not-order-field">>), + binding(<<"a">>, + 0)])]), + ok. + +binding(Destination, Order) -> + #binding{destination = #resource{name = Destination}, + args = [{<<"x-stream-partition-order">>, signedint, Order}]}. + +binding(Destination) -> + #binding{destination = #resource{name = Destination}, args = []}. |
