diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-07-03 13:06:27 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-07-03 15:00:49 +0100 |
| commit | 8eaee86d9bef26d2ae4f2566d445293748cd2421 (patch) | |
| tree | e2eaf0546529d60a3e2a9db5afb10afa4b39d27c /src | |
| parent | 580896422de202857ab039e64c0cc7fcd5916c9c (diff) | |
| download | rabbitmq-server-git-8eaee86d9bef26d2ae4f2566d445293748cd2421.tar.gz | |
Provide timeout to add_member command
[#166670827]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 15 |
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6fc5fcd83f..8c8f6dd4be 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -32,7 +32,7 @@ -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). --export([add_member/3]). +-export([add_member/4]). -export([delete_member/3]). -export([requeue/3]). -export([policy_changed/2]). @@ -69,6 +69,7 @@ -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). +-define(ADD_MEMBER_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -699,7 +700,7 @@ get_sys_status(Proc) -> end. -add_member(VHost, Name, Node) -> +add_member(VHost, Name, Node, Timeout) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -715,14 +716,14 @@ add_member(VHost, Name, Node) -> %% idempotent by design ok; false -> - add_member(Q, Node) + add_member(Q, Node, Timeout) end end; {error, not_found} = E -> E end. -add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> +add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> {RaName, _} = ServerRef = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), QNodes = amqqueue:get_quorum_nodes(Q), @@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> case ra:start_server(RaName, ServerId, ra_machine(Q), [{RaName, N} || N <- QNodes]) of ok -> - case ra:add_member(ServerRef, ServerId) of + case ra:add_member(ServerRef, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; {timeout, _} -> + _ = ra:force_delete_server(ServerId), + _ = ra:remove_member(ServerRef, ServerId), {error, timeout}; E -> _ = ra:force_delete_server(ServerId), @@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> QName = amqqueue:get_name(Q), rabbit_log:info("~s: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node) of + case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of ok -> {QName, {ok, Size + 1}}; {error, Err} -> |
