summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-14 18:02:22 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2021-09-14 18:02:22 +0200
commit04a06535716ba3dc7fb19b20de6766fa89881242 (patch)
tree7a491a70923f29fbe7b1ab27cdf4b52c446804fd
parent9725595c002865bf28d3b182658656096152d5b1 (diff)
downloadrabbitmq-server-git-04a06535716ba3dc7fb19b20de6766fa89881242.tar.gz
Sort stream partitions using binding parameter
If present. To make the partition order stable.
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl5
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl9
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_utils.erl25
-rw-r--r--deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl75
4 files changed, 109 insertions, 5 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 0be72d714f..4e202ed5b7 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -386,6 +386,9 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
%% FIXME make sure queue is a stream
%% TODO bindings could be sorted by partition number, by using a binding argument
%% this would make the spreading of messages stable
+ UnorderedBindings = rabbit_binding:list_for_source(ExchangeName),
+ OrderedBindings =
+ rabbit_stream_utils:sort_partitions(UnorderedBindings),
{ok,
lists:foldl(fun (#binding{destination =
#resource{kind = queue, name = Q}},
@@ -394,7 +397,7 @@ handle_call({partitions, VirtualHost, SuperStream}, _From, State) ->
(_Binding, Acc) ->
Acc
end,
- [], rabbit_binding:list_for_source(ExchangeName))}
+ [], OrderedBindings)}
catch
exit:Error ->
rabbit_log:error("Error while looking up exchange ~p, ~p",
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 303bf22cb1..5b4acc5f3e 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -174,7 +174,8 @@ callback_mode() ->
terminate(Reason, State,
#statem_data{transport = Transport,
connection = #stream_connection{socket = Socket},
- connection_state = ConnectionState} = StatemData) ->
+ connection_state = ConnectionState} =
+ StatemData) ->
close(Transport, Socket, ConnectionState),
rabbit_networking:unregister_non_amqp_connection(self()),
notify_connection_closed(StatemData),
@@ -1053,7 +1054,8 @@ close_sent(enter, _OldState,
StateTimeout}}) ->
{keep_state_and_data, {state_timeout, StateTimeout, close}};
close_sent(state_timeout, close, #statem_data{}) ->
- rabbit_log_connection:warning("Closing connection because of timeout in state '~s' likely due to lack of client action.",
+ rabbit_log_connection:warning("Closing connection because of timeout in state "
+ "'~s' likely due to lack of client action.",
[?FUNCTION_NAME]),
stop;
close_sent(info, {tcp, S, Data},
@@ -1081,7 +1083,8 @@ close_sent(info, {tcp_closed, S}, _StatemData) ->
[S, self()]),
stop;
close_sent(info, {tcp_error, S, Reason}, #statem_data{}) ->
- rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] [~w]",
+ rabbit_log_connection:error("Stream protocol connection socket error: ~p [~w] "
+ "[~w]",
[Reason, S, self()]),
stop;
close_sent(info, {resource_alarm, IsThereAlarm},
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
index 1e44dc10ce..92d0bff8af 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_utils.erl
@@ -25,10 +25,13 @@
check_configure_permitted/3,
check_write_permitted/3,
check_read_permitted/3,
- extract_stream_list/2]).
+ extract_stream_list/2,
+ sort_partitions/1]).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-include_lib("rabbit_common/include/rabbit.hrl").
+
enforce_correct_stream_name(Name) ->
% from rabbit_channel
StrippedName =
@@ -213,3 +216,23 @@ extract_stream_list(<<>>, Streams) ->
extract_stream_list(<<Length:16, Stream:Length/binary, Rest/binary>>,
Streams) ->
extract_stream_list(Rest, [Stream | Streams]).
+
+-spec sort_partitions([#binding{}]) -> [#binding{}].
+sort_partitions(Partitions) ->
+ lists:sort(fun(#binding{args = Args1}, #binding{args = Args2}) ->
+ Arg1 =
+ rabbit_misc:table_lookup(Args1,
+ <<"x-stream-partition-order">>),
+ Arg2 =
+ rabbit_misc:table_lookup(Args2,
+ <<"x-stream-partition-order">>),
+ case {Arg1, Arg2} of
+ {{_, Order1}, {_, Order2}} ->
+ rabbit_data_coercion:to_integer(Order1)
+ =< rabbit_data_coercion:to_integer(Order2);
+ {undefined, {_, _Order2}} -> false;
+ {{_, _Order1}, undefined} -> true;
+ _ -> true
+ end
+ end,
+ Partitions).
diff --git a/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl
new file mode 100644
index 0000000000..49704a8672
--- /dev/null
+++ b/deps/rabbitmq_stream/test/rabbit_stream_utils_SUITE.erl
@@ -0,0 +1,75 @@
+-module(rabbit_stream_utils_SUITE).
+
+-compile(nowarn_export_all).
+-compile(export_all).
+
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+%%%===================================================================
+%%% Common Test callbacks
+%%%===================================================================
+
+all() ->
+ [{group, tests}].
+
+suite() ->
+ [{timetrap, {seconds, 30}}].
+
+groups() ->
+ [{tests, [], [sort_partitions]}].
+
+init_per_suite(Config) ->
+ Config.
+
+end_per_suite(_Config) ->
+ ok.
+
+group(_GroupName) ->
+ [].
+
+init_per_group(_GroupName, Config) ->
+ Config.
+
+end_per_group(_GroupName, _Config) ->
+ ok.
+
+init_per_testcase(_TestCase, Config) ->
+ Config.
+
+end_per_testcase(_TestCase, _Config) ->
+ ok.
+
+%%%===================================================================
+%%% Test cases
+%%%===================================================================
+
+sort_partitions(_Config) ->
+ [] = rabbit_stream_utils:sort_partitions([]),
+ %[binding(<<"a">>, 1), binding(<<"b">>, 1),
+ % binding(<<"c">>, 2)]
+ ?assertEqual([<<"a">>, <<"b">>, <<"c">>],
+ [S
+ || #binding{destination = #resource{name = S}}
+ <- rabbit_stream_utils:sort_partitions([binding(<<"c">>,
+ 2),
+ binding(<<"b">>,
+ 1),
+ binding(<<"a">>,
+ 0)])]),
+ ?assertEqual([<<"a">>, <<"c">>, <<"not-order-field">>],
+ [S
+ || #binding{destination = #resource{name = S}}
+ <- rabbit_stream_utils:sort_partitions([binding(<<"c">>,
+ 10),
+ binding(<<"not-order-field">>),
+ binding(<<"a">>,
+ 0)])]),
+ ok.
+
+binding(Destination, Order) ->
+ #binding{destination = #resource{name = Destination},
+ args = [{<<"x-stream-partition-order">>, signedint, Order}]}.
+
+binding(Destination) ->
+ #binding{destination = #resource{name = Destination}, args = []}.