summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-07-04 02:14:44 +0200
committerGitHub <noreply@github.com>2019-07-04 02:14:44 +0200
commit56aafe2fc4c17592cc7bf946e79302a87dca47e1 (patch)
tree1a6b346ea0bc0aad30a80ab052c8b5f1b5bd433b /src
parent43037ce03d38a32d144ac3abe3d114ab938e5a0f (diff)
parent8eaee86d9bef26d2ae4f2566d445293748cd2421 (diff)
downloadrabbitmq-server-git-56aafe2fc4c17592cc7bf946e79302a87dca47e1.tar.gz
Merge pull request #2053 from rabbitmq/qq_add_member_timeout_enhancements
Provide timeout to add_member command
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_quorum_queue.erl15
1 files changed, 9 insertions, 6 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6fc5fcd83f..8c8f6dd4be 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -32,7 +32,7 @@
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
--export([add_member/3]).
+-export([add_member/4]).
-export([delete_member/3]).
-export([requeue/3]).
-export([policy_changed/2]).
@@ -69,6 +69,7 @@
-define(RPC_TIMEOUT, 1000).
-define(TICK_TIMEOUT, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
+-define(ADD_MEMBER_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
@@ -699,7 +700,7 @@ get_sys_status(Proc) ->
end.
-add_member(VHost, Name, Node) ->
+add_member(VHost, Name, Node, Timeout) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
@@ -715,14 +716,14 @@ add_member(VHost, Name, Node) ->
%% idempotent by design
ok;
false ->
- add_member(Q, Node)
+ add_member(Q, Node, Timeout)
end
end;
{error, not_found} = E ->
E
end.
-add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
+add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
{RaName, _} = ServerRef = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
QNodes = amqqueue:get_quorum_nodes(Q),
@@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
case ra:start_server(RaName, ServerId, ra_machine(Q),
[{RaName, N} || N <- QNodes]) of
ok ->
- case ra:add_member(ServerRef, ServerId) of
+ case ra:add_member(ServerRef, ServerId, Timeout) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
Q2 = amqqueue:set_quorum_nodes(
@@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
{timeout, _} ->
+ _ = ra:force_delete_server(ServerId),
+ _ = ra:remove_member(ServerRef, ServerId),
{error, timeout};
E ->
_ = ra:force_delete_server(ServerId),
@@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
QName = amqqueue:get_name(Q),
rabbit_log:info("~s: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
- case add_member(Q, Node) of
+ case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->