summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-10-26 15:39:08 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-10-26 15:39:08 +0000
commit53b78579bdd346f07d8a67f626014ce963f70204 (patch)
tree411a7a3656af8501082fffa92dc98a84e110caf9
parentf0f5134f1b0c2b114683fc66a277d3f3cd3679fe (diff)
downloadrabbitmq-server-git-stream-policies.tar.gz
Add initial-cluster-size as a stream policystream-policies
It was only a queue argument
-rw-r--r--src/rabbit_policies.erl7
-rw-r--r--src/rabbit_stream_queue.erl17
-rw-r--r--test/rabbit_stream_queue_SUITE.erl21
3 files changed, 37 insertions, 8 deletions
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index f2c44e2289..54e4d2c03e 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -42,6 +42,7 @@ register() ->
{policy_validator, <<"max-age">>},
{policy_validator, <<"max-segment-size">>},
{policy_validator, <<"queue-leader-locator">>},
+ {policy_validator, <<"initial-cluster-size">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
@@ -157,6 +158,12 @@ validate_policy0(<<"queue-leader-locator">>, <<"least-leaders">>) ->
validate_policy0(<<"queue-leader-locator">>, Value) ->
{error, "~p is not a valid queue leader locator value", [Value]};
+validate_policy0(<<"initial-cluster-size">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"initial-cluster-size">>, Value) ->
+ {error, "~p is not a valid cluster size", [Value]};
+
validate_policy0(<<"max-segment-size">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl
index 0ae58fc6b8..8f9a51a17d 100644
--- a/src/rabbit_stream_queue.erl
+++ b/src/rabbit_stream_queue.erl
@@ -506,9 +506,11 @@ make_stream_conf(Node, Q) ->
MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q),
LeaderLocator = queue_leader_locator(args_policy_lookup(<<"queue-leader-locator">>,
fun res_arg/2, Q)),
+ InitialClusterSize = initial_cluster_size(args_policy_lookup(<<"initial-cluster-size">>,
+ fun res_arg/2, Q)),
Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node],
Arguments = amqqueue:get_arguments(Q),
- Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0),
+ Replicas = select_stream_nodes(InitialClusterSize - 1, Replicas0),
Formatter = {?MODULE, format_osiris_event, [QName]},
Retention = lists:filter(fun({_, R}) ->
R =/= undefined
@@ -576,6 +578,11 @@ max_age(Age1, Age2) ->
queue_leader_locator(undefined) -> <<"client-local">>;
queue_leader_locator(Val) -> Val.
+initial_cluster_size(undefined) ->
+ length(rabbit_mnesia:cluster_nodes(running));
+initial_cluster_size(Val) ->
+ Val.
+
res_arg(PolVal, undefined) -> PolVal;
res_arg(_, ArgVal) -> ArgVal.
@@ -688,7 +695,7 @@ msg_to_iodata(#basic_message{exchange_name = #resource{name = Exchange},
capabilities() ->
#{policies => [<<"max-length-bytes">>, <<"max-age">>, <<"max-segment-size">>,
- <<"queue-leader-locator">>],
+ <<"queue-leader-locator">>, <<"initial-cluster-size">>],
queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>,
<<"x-max-length">>, <<"x-max-length-bytes">>,
<<"x-single-active-consumer">>, <<"x-queue-type">>,
@@ -696,9 +703,3 @@ capabilities() ->
<<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
consumer_arguments => [<<"x-stream-offset">>],
server_named => false}.
-
-get_initial_cluster_size(Arguments) ->
- case rabbit_misc:table_lookup(Arguments, <<"x-initial-cluster-size">>) of
- undefined -> length(rabbit_mnesia:cluster_nodes(running));
- {_Type, Val} -> Val
- end.
diff --git a/test/rabbit_stream_queue_SUITE.erl b/test/rabbit_stream_queue_SUITE.erl
index 5ae4de82f4..06f69569e8 100644
--- a/test/rabbit_stream_queue_SUITE.erl
+++ b/test/rabbit_stream_queue_SUITE.erl
@@ -49,6 +49,7 @@ groups() ->
leader_failover,
initial_cluster_size_one,
initial_cluster_size_two,
+ initial_cluster_size_one_policy,
leader_locator_client_local,
leader_locator_random,
leader_locator_least_leaders,
@@ -1144,6 +1145,26 @@ initial_cluster_size_two(Config) ->
?assertMatch(#'queue.delete_ok'{},
amqp_channel:call(Ch, #'queue.delete'{queue = Q})).
+initial_cluster_size_one_policy(Config) ->
+ [Server1 | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"cluster-size">>, <<"initial_cluster_size_one_policy">>, <<"queues">>,
+ [{<<"initial-cluster-size">>, 1}]),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ Q = ?config(queue_name, Config),
+
+ ?assertEqual({'queue.declare_ok', Q, 0, 0},
+ declare(Ch, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
+ {<<"x-initial-cluster-size">>, long, 1}])),
+ check_leader_and_replicas(Config, Q, Server1, []),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q})),
+
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"cluster-size">>).
+
leader_locator_client_local(Config) ->
[Server1, Server2, Server3] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),