diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2020-10-12 14:43:37 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2020-10-12 14:43:37 +0100 |
commit | f3ab0a736685f3bac767bb40001bfb85cce81a9f (patch) | |
tree | a91687556f34ae16f360c1c20c06e1b2f5aaed75 /src | |
parent | 6ebdbf4191e43a96bf5e3a6382985918ac211744 (diff) | |
download | rabbitmq-server-git-stream-initial-cluster-size.tar.gz |
Select initial cluster size on stream queue declarationstream-initial-cluster-size
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 7 | ||||
-rw-r--r-- | src/rabbit_stream_queue.erl | 30 |
2 files changed, 32 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index fd7682fdeb..82b261045d 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -780,9 +780,10 @@ declare_args() -> {<<"x-queue-mode">>, fun check_queue_mode/2}, {<<"x-single-active-consumer">>, fun check_single_active_consumer_arg/2}, {<<"x-queue-type">>, fun check_queue_type/2}, - {<<"x-quorum-initial-group-size">>, fun check_default_quorum_initial_group_size_arg/2}, + {<<"x-quorum-initial-group-size">>, fun check_initial_cluster_size_arg/2}, {<<"x-max-age">>, fun check_max_age_arg/2}, - {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}]. + {<<"x-max-segment-size">>, fun check_non_neg_int_arg/2}, + {<<"x-initial-cluster-size">>, fun check_initial_cluster_size_arg/2}]. consume_args() -> [{<<"x-priority">>, fun check_int_arg/2}, {<<"x-cancel-on-ha-failover">>, fun check_bool_arg/2}]. @@ -829,7 +830,7 @@ check_single_active_consumer_arg({Type, Val}, Args) -> Error -> Error end. -check_default_quorum_initial_group_size_arg({Type, Val}, Args) -> +check_initial_cluster_size_arg({Type, Val}, Args) -> case check_non_neg_int_arg({Type, Val}, Args) of ok when Val == 0 -> {error, {value_zero, Val}}; ok -> ok; diff --git a/src/rabbit_stream_queue.erl b/src/rabbit_stream_queue.erl index ed44f1258a..7dd9ac27f6 100644 --- a/src/rabbit_stream_queue.erl +++ b/src/rabbit_stream_queue.erl @@ -495,7 +495,9 @@ make_stream_conf(Node, Q) -> MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), MaxAge = max_age(args_policy_lookup(<<"max-age">>, fun max_age/2, Q)), MaxSegmentSize = args_policy_lookup(<<"max-segment-size">>, fun min/2, Q), - Replicas = rabbit_mnesia:cluster_nodes(all) -- [Node], + Replicas0 = rabbit_mnesia:cluster_nodes(all) -- [Node], + Arguments = amqqueue:get_arguments(Q), + Replicas = select_stream_nodes(get_initial_cluster_size(Arguments) - 1, Replicas0), Formatter = {?MODULE, format_osiris_event, [QName]}, Retention = lists:filter(fun({_, R}) -> R =/= undefined @@ -509,6 +511,23 @@ make_stream_conf(Node, Q) -> event_formatter => Formatter, epoch => 1}). +select_stream_nodes(Size, All) when length(All) =< Size -> + All; +select_stream_nodes(Size, All) -> + Node = node(), + case lists:member(Node, All) of + true -> + select_stream_nodes(Size - 1, lists:delete(Node, All), [Node]); + false -> + select_stream_nodes(Size, All, []) + end. + +select_stream_nodes(0, _, Selected) -> + Selected; +select_stream_nodes(Size, Rest, Selected) -> + S = lists:nth(rand:uniform(length(Rest)), Rest), + select_stream_nodes(Size - 1, lists:delete(S, Rest), [S | Selected]). + update_stream_conf(#{reference := QName} = Conf) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> @@ -654,6 +673,13 @@ capabilities() -> queue_arguments => [<<"x-dead-letter-exchange">>, <<"x-dead-letter-routing-key">>, <<"x-max-length">>, <<"x-max-length-bytes">>, <<"x-single-active-consumer">>, <<"x-queue-type">>, - <<"x-max-age">>, <<"x-max-segment-size">>], + <<"x-max-age">>, <<"x-max-segment-size">>, + <<"x-initial-cluster-size">>], consumer_arguments => [<<"x-stream-offset">>], server_named => false}. + +get_initial_cluster_size(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-initial-cluster-size">>) of + undefined -> length(rabbit_mnesia:cluster_nodes(running)); + {_Type, Val} -> Val + end. |