diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-21 01:13:38 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-21 01:13:38 +0300 |
| commit | 9e4095fd906da893ca08b02836ce0716bbfac39f (patch) | |
| tree | c98002416ba4d63a08a8957b946c8547fadd602b /src | |
| parent | d32660c5fc2abc6250ff9df2393eef2d407e7351 (diff) | |
| parent | aee6ef638e9e25d5e687aa937fe8c93a2410ed3e (diff) | |
| download | rabbitmq-server-git-9e4095fd906da893ca08b02836ce0716bbfac39f.tar.gz | |
Merge pull request #1894 from rabbitmq/quorum-queue-grow-cmd
Quorum queue grow and shrink commands
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 105 |
1 files changed, 91 insertions, 14 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 083acbb2d2..0ef6d88086 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -37,6 +37,8 @@ -export([requeue/3]). -export([policy_changed/2]). -export([cleanup_data_dir/0]). +-export([shrink_all/1, + grow/4]). %%-include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("rabbit.hrl"). @@ -663,7 +665,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes ServerId = {RaName, Node}, case ra:start_server(RaName, ServerId, ra_machine(Q), - [{RaName, N} || N <- QNodes]) of + [{RaName, N} || N <- QNodes]) of ok -> case ra:add_member(ServerRef, ServerId) of {ok, _, Leader} -> @@ -676,11 +678,15 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> rabbit_misc:execute_mnesia_transaction( fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; + timeout -> + {error, timeout}; E -> %% TODO should we stop the ra process here? E end; - {error, _} = E -> + timeout -> + {error, timeout}; + E -> E end. @@ -710,20 +716,91 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> QName = amqqueue:get_name(Q), {RaName, _} = amqqueue:get_pid(Q), ServerId = {RaName, Node}, - case ra:leave_and_delete_server(ServerId) of - ok -> - Fun = fun(Q1) -> - amqqueue:set_quorum_nodes( - Q1, - lists:delete(Node, amqqueue:get_quorum_nodes(Q1))) - end, - rabbit_misc:execute_mnesia_transaction( - fun() -> rabbit_amqqueue:update(QName, Fun) end), - ok; - E -> - E + case amqqueue:get_quorum_nodes(Q) of + [Node] -> + %% deleting the last member is not allowed + {error, last_node}; + _ -> + case ra:leave_and_delete_server(ServerId) of + ok -> + Fun = fun(Q1) -> + amqqueue:set_quorum_nodes( + Q1, + lists:delete(Node, + amqqueue:get_quorum_nodes(Q1))) + end, + rabbit_misc:execute_mnesia_transaction( + fun() -> rabbit_amqqueue:update(QName, Fun) end), + ok; + timeout -> + {error, timeout}; + E -> + E + end end. +-spec shrink_all(node()) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +shrink_all(Node) -> + [begin + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: removing member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + Size = length(amqqueue:get_quorum_nodes(Q)), + case delete_member(Q, Node) of + ok -> + {QName, {ok, Size-1}}; + {error, Err} -> + rabbit_log:warning("~s: failed to remove member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + lists:member(Node, amqqueue:get_quorum_nodes(Q))]. + +-spec grow(node(), binary(), binary(), all | even) -> + [{rabbit_amqqueue:name(), + {ok, pos_integer()} | {error, pos_integer(), term()}}]. +grow(Node, VhostSpec, QueueSpec, Strategy) -> + Running = rabbit_mnesia:cluster_nodes(running), + [begin + Size = length(amqqueue:get_quorum_nodes(Q)), + QName = amqqueue:get_name(Q), + rabbit_log:info("~s: adding a new member (replica) on node ~w", + [rabbit_misc:rs(QName), Node]), + case add_member(Q, Node) of + ok -> + {QName, {ok, Size + 1}}; + {error, Err} -> + rabbit_log:warning( + "~s: failed to add member (replica) on node ~w, error: ~w", + [rabbit_misc:rs(QName), Node, Err]), + {QName, {error, Size, Err}} + end + end + || Q <- rabbit_amqqueue:list(), + amqqueue:get_type(Q) == quorum, + %% don't add a member if there is already one on the node + not lists:member(Node, amqqueue:get_quorum_nodes(Q)), + %% node needs to be running + lists:member(Node, Running), + matches_strategy(Strategy, amqqueue:get_quorum_nodes(Q)), + is_match(amqqueue:get_vhost(Q), VhostSpec) andalso + is_match(get_resource_name(amqqueue:get_name(Q)), QueueSpec) ]. + +get_resource_name(#resource{name = Name}) -> + Name. + +matches_strategy(all, _) -> true; +matches_strategy(even, Members) -> + length(Members) rem 2 == 0. + +is_match(Subj, E) -> + nomatch /= re:run(Subj, E). + + %%---------------------------------------------------------------------------- dlx_mfa(Q) -> DLX = init_dlx(args_policy_lookup(<<"dead-letter-exchange">>, |
