diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-07-04 02:14:44 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-07-04 02:14:44 +0200 |
| commit | 56aafe2fc4c17592cc7bf946e79302a87dca47e1 (patch) | |
| tree | 1a6b346ea0bc0aad30a80ab052c8b5f1b5bd433b /src | |
| parent | 43037ce03d38a32d144ac3abe3d114ab938e5a0f (diff) | |
| parent | 8eaee86d9bef26d2ae4f2566d445293748cd2421 (diff) | |
| download | rabbitmq-server-git-56aafe2fc4c17592cc7bf946e79302a87dca47e1.tar.gz | |
Merge pull request #2053 from rabbitmq/qq_add_member_timeout_enhancements
Provide timeout to add_member command
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6fc5fcd83f..8c8f6dd4be 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -32,7 +32,7 @@ -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). --export([add_member/3]). +-export([add_member/4]). -export([delete_member/3]). -export([requeue/3]). -export([policy_changed/2]). @@ -69,6 +69,7 @@ -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). +-define(ADD_MEMBER_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -699,7 +700,7 @@ get_sys_status(Proc) -> end. -add_member(VHost, Name, Node) -> +add_member(VHost, Name, Node, Timeout) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -715,14 +716,14 @@ add_member(VHost, Name, Node) -> %% idempotent by design ok; false -> - add_member(Q, Node) + add_member(Q, Node, Timeout) end end; {error, not_found} = E -> E end. -add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> +add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> {RaName, _} = ServerRef = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), QNodes = amqqueue:get_quorum_nodes(Q), @@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> case ra:start_server(RaName, ServerId, ra_machine(Q), [{RaName, N} || N <- QNodes]) of ok -> - case ra:add_member(ServerRef, ServerId) of + case ra:add_member(ServerRef, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; {timeout, _} -> + _ = ra:force_delete_server(ServerId), + _ = ra:remove_member(ServerRef, ServerId), {error, timeout}; E -> _ = ra:force_delete_server(ServerId), @@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> QName = amqqueue:get_name(Q), rabbit_log:info("~s: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node) of + case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of ok -> {QName, {ok, Size + 1}}; {error, Err} -> |
