summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-02-19 17:09:59 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-20 12:28:48 +0000
commita9fe3b07ffd74b2b1fa66a175b9f0005ed7fbaeb (patch)
tree53772c859d2d6f1604dfb8937c13055742dd1c88
parente4000af432e401338ccc747cd1f94bc983bfc95b (diff)
downloadrabbitmq-server-git-a9fe3b07ffd74b2b1fa66a175b9f0005ed7fbaeb.tar.gz
Add rabbit_quorum_queue:grow/4 function
To allow operators to grow quorum queue clusters with some degree of selection. [#162782801]
-rw-r--r--src/rabbit_quorum_queue.erl54
1 files changed, 51 insertions, 3 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 63b182ef20..08b4eaf5b3 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -37,7 +37,8 @@
-export([requeue/3]).
-export([policy_changed/2]).
-export([cleanup_data_dir/0]).
--export([shrink_all/1]).
+-export([shrink_all/1,
+ grow/4]).
%%-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit.hrl").
@@ -664,7 +665,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% TODO parallel calls might crash this, or add a duplicate in quorum_nodes
ServerId = {RaName, Node},
case ra:start_server(RaName, ServerId, ra_machine(Q),
- [{RaName, N} || N <- QNodes]) of
+ [{RaName, N} || N <- QNodes]) of
ok ->
case ra:add_member(ServerRef, ServerId) of
{ok, _, Leader} ->
@@ -677,11 +678,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
+ timeout ->
+ {error, timeout};
E ->
%% TODO should we stop the ra process here?
E
end;
- {error, _} = E ->
+ timeout ->
+ {error, timeout};
+ E ->
E
end.
@@ -734,6 +739,9 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
end
end.
+-spec shrink_all(node()) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
shrink_all(Node) ->
[begin
QName = amqqueue:get_name(Q),
@@ -752,6 +760,46 @@ shrink_all(Node) ->
amqqueue:get_type(Q) == quorum,
lists:member(Node, amqqueue:get_quorum_nodes(Q))].
+-spec grow(node(), binary(), binary(),
+ all | even) ->
+ [{rabbit_amqqueue:name(),
+ {ok, pos_integer()} | {error, pos_integer(), term()}}].
+grow(Node, VhostSpec, QueueSpec, Strategy) ->
+ ConnectedNodes = [node() | nodes()],
+ [begin
+ QName = amqqueue:get_name(Q),
+ rabbit_log:info("~s: Adding member ~w",
+ [rabbit_misc:rs(QName), Node]),
+ Size = length(amqqueue:get_quorum_nodes(Q)),
+ case add_member(Q, Node) of
+ ok ->
+ {QName, {ok, Size + 1}};
+ {error, Err} ->
+ rabbit_log:warning("~s: Failed to add member ~w, Error ~w",
+ [rabbit_misc:rs(QName), Node, Err]),
+ {QName, {error, Size, Err}}
+ end
+ end
+ || Q <- rabbit_amqqueue:list(),
+ amqqueue:get_type(Q) == quorum,
+ %% don't add a member if there is already one on the node
+ not lists:member(Node, amqqueue:get_quorum_nodes(Q)),
+ %% if the node isn't connected, best not to add it
+ lists:member(Node, ConnectedNodes),
+ matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)),
+ is_match(amqqueue:get_vhost(Q), VhostSpec) andalso
+ is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ].
+
+get_resource_name(#resource{name = Name}) ->
+ Name.
+
+matches_strategy(all, _) -> true;
+matches_strategy(even, Members) ->
+ length(Members) rem 2 == 0.
+
+is_match(Subj, E) ->
+ nomatch /= re:run(Subj, E).
+
%%----------------------------------------------------------------------------
dlx_mfa(Q) ->