diff options
-rw-r--r-- | src/rabbit_policies.erl | 7 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 17 | ||||
-rw-r--r-- | test/rabbit_stream_queue_SUITE.erl | 21 |
3 files changed, 37 insertions, 8 deletions
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index f2c44e2289..54e4d2c03e 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -42,6 +42,7 @@ register() -> {policy_validator, <<"max-age">>}, {policy_validator, <<"max-segment-size">>}, {policy_validator, <<"queue-leader-locator">>}, + {policy_validator, <<"initial-cluster-size">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, @@ -157,6 +158,12 @@ validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) -> validate_policy0(<<"queue-leader-locator">>, Value) -> {error, "~p is not a valid queue leader locator value", [Value]}; +validate_policy0(<<"initial-cluster-size">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"initial-cluster-size">>, Value) -> + {error, "~p is not a valid cluster size", [Value]}; + validate_policy0(<<"max-segment-size">>, Value) when is_integer(Value), Value >= 0 -> ok; diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index 0ae58fc6b8..8f9a51a17d 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -506,9 +506,11 @@ make_stream_conf(Node, Q) -> MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>, fun res_arg/2, Q)), + InitialClusterSize = initial_cluster_size(args_policy_lookup(<<"initial-cluster-size">>, + fun res_arg/2, Q)), Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node], Arguments = amqqueue:get_arguments(Q), - Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0), + Replicas = select_stream_nodes(InitialClusterSize - 1, Replicas0), Formatter = {?MODULE, format_osiris_event, [QName]}, Retention = lists:filter(fun({_, R}) -> R =/= undefined @@ -576,6 +578,11 @@ max_age(Age1, Age2) -> queue_leader_locator(undefined) -> <<"client-local">>; queue_leader_locator(Val) -> Val. +initial_cluster_size(undefined) -> + length(rabbit_mnesia:cluster_nodes(running)); +initial_cluster_size(Val) -> + Val. + res_arg(PolVal, undefined) -> PolVal; res_arg(_, ArgVal) -> ArgVal. @@ -688,7 +695,7 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange}, capabilities() -> #{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>, - <<"queue-leader-locator">>], + <<"queue-leader-locator">>, <<"initial-cluster-size">>], queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, @@ -696,9 +703,3 @@ capabilities() -> <<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. - -get_initial_cluster_size(Arguments) -> - case rabbit_misc:table_lookup(Arguments, <<"x-initial-cluster-size">>) of - undefined -> length(rabbit_mnesia:cluster_nodes(running)); - {_Type, Val} -> Val - end. diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl index 5ae4de82f4..06f69569e8 100644 --- a/test/rabbit_stream_queue_SUITE.erl +++ b/test/rabbit_stream_queue_SUITE.erl @@ -49,6 +49,7 @@ groups() -> leader_failover, initial_cluster_size_one, initial_cluster_size_two, + initial_cluster_size_one_policy, leader_locator_client_local, leader_locator_random, leader_locator_least_leaders, @@ -1144,6 +1145,26 @@ initial_cluster_size_two(Config) -> ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = Q})). +initial_cluster_size_one_policy(Config) -> + [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"cluster-size">>, <<"initial_cluster_size_one_policy">>, <<"queues">>, + [{<<"initial-cluster-size">>, 1}]), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + Q = ?config(queue_name, Config), + + ?assertEqual({'queue.declare_ok', Q, 0, 0}, + declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}, + {<<"x-initial-cluster-size">>, long, 1}])), + check_leader_and_replicas(Config, Q, Server1, []), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q})), + + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>). + leader_locator_client_local(Config) -> [Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |