summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-07-03 13:06:27 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-07-03 15:00:49 +0100
commit8eaee86d9bef26d2ae4f2566d445293748cd2421 (patch)
treee2eaf0546529d60a3e2a9db5afb10afa4b39d27c
parent580896422de202857ab039e64c0cc7fcd5916c9c (diff)
downloadrabbitmq-server-git-8eaee86d9bef26d2ae4f2566d445293748cd2421.tar.gz
Provide timeout to add_member command
[#166670827]
-rw-r--r--src/rabbit_quorum_queue.erl15
-rw-r--r--test/quorum_queue_SUITE.erl12
2 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 6fc5fcd83f..8c8f6dd4be 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -32,7 +32,7 @@
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
--export([add_member/3]).
+-export([add_member/4]).
-export([delete_member/3]).
-export([requeue/3]).
-export([policy_changed/2]).
@@ -69,6 +69,7 @@
-define(RPC_TIMEOUT, 1000).
-define(TICK_TIMEOUT, 5000). %% the ra server tick time
-define(DELETE_TIMEOUT, 5000).
+-define(ADD_MEMBER_TIMEOUT, 5000).
%%----------------------------------------------------------------------------
@@ -699,7 +700,7 @@ get_sys_status(Proc) ->
end.
-add_member(VHost, Name, Node) ->
+add_member(VHost, Name, Node, Timeout) ->
QName = #resource{virtual_host = VHost, name = Name, kind = queue},
case rabbit_amqqueue:lookup(QName) of
{ok, Q} when ?amqqueue_is_classic(Q) ->
@@ -715,14 +716,14 @@ add_member(VHost, Name, Node) ->
%% idempotent by design
ok;
false ->
- add_member(Q, Node)
+ add_member(Q, Node, Timeout)
end
end;
{error, not_found} = E ->
E
end.
-add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
+add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) ->
{RaName, _} = ServerRef = amqqueue:get_pid(Q),
QName = amqqueue:get_name(Q),
QNodes = amqqueue:get_quorum_nodes(Q),
@@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
case ra:start_server(RaName, ServerId, ra_machine(Q),
[{RaName, N} || N <- QNodes]) of
ok ->
- case ra:add_member(ServerRef, ServerId) of
+ case ra:add_member(ServerRef, ServerId, Timeout) of
{ok, _, Leader} ->
Fun = fun(Q1) ->
Q2 = amqqueue:set_quorum_nodes(
@@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
fun() -> rabbit_amqqueue:update(QName, Fun) end),
ok;
{timeout, _} ->
+ _ = ra:force_delete_server(ServerId),
+ _ = ra:remove_member(ServerRef, ServerId),
{error, timeout};
E ->
_ = ra:force_delete_server(ServerId),
@@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) ->
QName = amqqueue:get_name(Q),
rabbit_log:info("~s: adding a new member (replica) on node ~w",
[rabbit_misc:rs(QName), Node]),
- case add_member(Q, Node) of
+ case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of
ok ->
{QName, {ok, Size + 1}};
{error, Err} ->
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 1d9789fe89..cc0e7aa75a 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -1222,7 +1222,7 @@ add_member_not_running(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, 'rabbit@burrow'])).
+ [<<"/">>, QQ, 'rabbit@burrow', 5000])).
add_member_classic(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1231,7 +1231,7 @@ add_member_classic(Config) ->
?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
?assertEqual({error, classic_queue_not_supported},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, CQ, Server])).
+ [<<"/">>, CQ, Server, 5000])).
add_member_already_a_member(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1242,14 +1242,14 @@ add_member_already_a_member(Config) ->
%% idempotent by design
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server])).
+ [<<"/">>, QQ, Server, 5000])).
add_member_not_found(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QQ = ?config(queue_name, Config),
?assertEqual({error, not_found},
rpc:call(Server, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server])).
+ [<<"/">>, QQ, Server, 5000])).
add_member(Config) ->
[Server0, Server1] = Servers0 =
@@ -1260,12 +1260,12 @@ add_member(Config) ->
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
?assertEqual({error, node_not_running},
rpc:call(Server0, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server1])),
+ [<<"/">>, QQ, Server1, 5000])),
ok = rabbit_control_helper:command(stop_app, Server1),
ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
rabbit_control_helper:command(start_app, Server1),
?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
- [<<"/">>, QQ, Server1])),
+ [<<"/">>, QQ, Server1, 5000])),
Info = rpc:call(Server0, rabbit_quorum_queue, infos,
[rabbit_misc:r(<<"/">>, queue, QQ)]),
Servers = lists:sort(Servers0),