diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-02-19 17:09:59 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-20 12:28:48 +0000 |
| commit | a9fe3b07ffd74b2b1fa66a175b9f0005ed7fbaeb (patch) | |
| tree | 53772c859d2d6f1604dfb8937c13055742dd1c88 | |
| parent | e4000af432e401338ccc747cd1f94bc983bfc95b (diff) | |
| download | rabbitmq-server-git-a9fe3b07ffd74b2b1fa66a175b9f0005ed7fbaeb.tar.gz | |
Add rabbit_quorum_queue:grow/4 function
To allow operators to grow quorum queue clusters with some degree of
selection.
[#162782801]
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 54 |
1 files changed, 51 insertions, 3 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 63b182ef20..08b4eaf5b3 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,7 +37,8 @@ -export([requeue/3]). -export([policy_changed/2]). -export([cleanup_data_dir/0]). --export([shrink_all/1]). +-export([shrink_all/1, + grow/4]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -664,7 +665,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + [{RaName, N} || N <- QNodes]) of ok -> case ra:add_member(ServerRef, ServerId) of {ok, _, Leader} -> @@ -677,11 +678,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; + timeout -> + {error, timeout}; E -> %% TODO should we stop the ra process here? E end; - {error, _} = E -> + timeout -> + {error, timeout}; + E -> E end. @@ -734,6 +739,9 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> end end. +-spec shrink_all(node()) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. shrink_all(Node) -> [begin QName = amqqueue:get_name(Q), @@ -752,6 +760,46 @@ shrink_all(Node) -> amqqueue:get_type(Q) == quorum, lists:member(Node, amqqueue:get_quorum_nodes(Q))]. +-spec grow(node(), binary(), binary(), + all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +grow(Node, VhostSpec, QueueSpec, Strategy) -> + ConnectedNodes = [node() | nodes()], + [begin + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: Adding member ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(amqqueue:get_quorum_nodes(Q)), + case add_member(Q, Node) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning("~s: Failed to add member ~w, Error ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + %% don't add a member if there is already one on the node + not lists:member(Node, amqqueue:get_quorum_nodes(Q)), + %% if the node isn't connected, best not to add it + lists:member(Node, ConnectedNodes), + matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + +get_resource_name(#resource{name = Name}) -> + Name. + +matches_strategy(all, _) -> true; +matches_strategy(even, Members) -> + length(Members) rem 2 == 0. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> |
