diff options
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 30 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 40 |
3 files changed, 71 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fd7682fdeb..82b261045d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -780,9 +780,10 @@ declare_args() -> {<<"x-queue-mode">>, fun check_queue_mode/2}, {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, {<<"x-queue-type">>, fun check_queue_type/2}, - {<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}, + {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, {<<"x-max-age">>, fun check_max_age_arg/2}, - {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}, + {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -829,7 +830,7 @@ check_single_active_consumer_arg({Type, Val}, Args) -> Error -> Error end. -check_default_quorum_initial_group_size_arg({Type, Val}, Args) -> +check_initial_cluster_size_arg({Type, Val}, Args) -> case check_non_neg_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; ok -> ok; diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index ed44f1258a..7dd9ac27f6 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -495,7 +495,9 @@ make_stream_conf(Node, Q) -> MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), - Replicas = rabbit_mnesia:cluster_nodes(all) -- [Node], + Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node], + Arguments = amqqueue:get_arguments(Q), + Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0), Formatter = {?MODULE, format_osiris_event, [QName]}, Retention = lists:filter(fun({_, R}) -> R =/= undefined @@ -509,6 +511,23 @@ make_stream_conf(Node, Q) -> event_formatter => Formatter, epoch => 1}). +select_stream_nodes(Size, All) when length(All) =< Size -> + All; +select_stream_nodes(Size, All) -> + Node = node(), + case lists:member(Node, All) of + true -> + select_stream_nodes(Size - 1, lists:delete(Node, All), [Node]); + false -> + select_stream_nodes(Size, All, []) + end. + +select_stream_nodes(0, _, Selected) -> + Selected; +select_stream_nodes(Size, Rest, Selected) -> + S = lists:nth(rand:uniform(length(Rest)), Rest), + select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). + update_stream_conf(#{reference := QName} = Conf) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> @@ -654,6 +673,13 @@ capabilities() -> queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, - <<"x-max-age">>, <<"x-max-segment-size">>], + <<"x-max-age">>, <<"x-max-segment-size">>, + <<"x-initial-cluster-size">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. + +get_initial_cluster_size(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-initial-cluster-size">>) of + undefined -> length(rabbit_mnesia:cluster_nodes(running)); + {_Type, Val} -> Val + end. diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index cb38e15ea7..8b8966b049 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -46,7 +46,9 @@ groups() -> delete_classic_replica, delete_quorum_replica, consume_from_replica, - leader_failover]}, + leader_failover, + initial_cluster_size_one, + initial_cluster_size_two]}, {unclustered_size_3_1, [], [add_replica]}, {unclustered_size_3_2, [], [consume_without_local_replica]}, {unclustered_size_3_3, [], [grow_coordinator_cluster]}, @@ -1141,6 +1143,42 @@ leader_failover(Config) -> ?assert(NewLeader =/= Server1), ok = rabbit_ct_broker_helpers:start_node(Config, Server1). +initial_cluster_size_one(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 1}])), + check_leader_and_replicas(Config, Q, Server1, []), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + +initial_cluster_size_two(Config) -> + [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 2}])), + + [Info] = lists:filter( + fun(Props) -> + lists:member({name, rabbit_misc:r(<<"/">>, queue, Q)}, Props) + end, + rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, + info_all, [<<"/">>, [name, leader, members]])), + ?assertEqual(Server1, proplists:get_value(leader, Info)), + ?assertEqual(1, length(proplists:get_value(members, Info))), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})). + invalid_policy(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |