diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-07-03 13:06:27 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-07-03 15:00:49 +0100 |
| commit | 8eaee86d9bef26d2ae4f2566d445293748cd2421 (patch) | |
| tree | e2eaf0546529d60a3e2a9db5afb10afa4b39d27c | |
| parent | 580896422de202857ab039e64c0cc7fcd5916c9c (diff) | |
| download | rabbitmq-server-git-8eaee86d9bef26d2ae4f2566d445293748cd2421.tar.gz | |
Provide timeout to add_member command
[#166670827]
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 15 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 12 |
2 files changed, 15 insertions, 12 deletions
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 6fc5fcd83f..8c8f6dd4be 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -32,7 +32,7 @@ -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). --export([add_member/3]). +-export([add_member/4]). -export([delete_member/3]). -export([requeue/3]). -export([policy_changed/2]). @@ -69,6 +69,7 @@ -define(RPC_TIMEOUT, 1000). -define(TICK_TIMEOUT, 5000). %% the ra server tick time -define(DELETE_TIMEOUT, 5000). +-define(ADD_MEMBER_TIMEOUT, 5000). %%---------------------------------------------------------------------------- @@ -699,7 +700,7 @@ get_sys_status(Proc) -> end. -add_member(VHost, Name, Node) -> +add_member(VHost, Name, Node, Timeout) -> QName = #resource{virtual_host = VHost, name = Name, kind = queue}, case rabbit_amqqueue:lookup(QName) of {ok, Q} when ?amqqueue_is_classic(Q) -> @@ -715,14 +716,14 @@ add_member(VHost, Name, Node) -> %% idempotent by design ok; false -> - add_member(Q, Node) + add_member(Q, Node, Timeout) end end; {error, not_found} = E -> E end. -add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> +add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> {RaName, _} = ServerRef = amqqueue:get_pid(Q), QName = amqqueue:get_name(Q), QNodes = amqqueue:get_quorum_nodes(Q), @@ -731,7 +732,7 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> case ra:start_server(RaName, ServerId, ra_machine(Q), [{RaName, N} || N <- QNodes]) of ok -> - case ra:add_member(ServerRef, ServerId) of + case ra:add_member(ServerRef, ServerId, Timeout) of {ok, _, Leader} -> Fun = fun(Q1) -> Q2 = amqqueue:set_quorum_nodes( @@ -743,6 +744,8 @@ add_member(Q, Node) when ?amqqueue_is_quorum(Q) -> fun() -> rabbit_amqqueue:update(QName, Fun) end), ok; {timeout, _} -> + _ = ra:force_delete_server(ServerId), + _ = ra:remove_member(ServerRef, ServerId), {error, timeout}; E -> _ = ra:force_delete_server(ServerId), @@ -828,7 +831,7 @@ grow(Node, VhostSpec, QueueSpec, Strategy) -> QName = amqqueue:get_name(Q), rabbit_log:info("~s: adding a new member (replica) on node ~w", [rabbit_misc:rs(QName), Node]), - case add_member(Q, Node) of + case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of ok -> {QName, {ok, Size + 1}}; {error, Err} -> diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 1d9789fe89..cc0e7aa75a 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -1222,7 +1222,7 @@ add_member_not_running(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?assertEqual({error, node_not_running}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, 'rabbit@burrow'])). + [<<"/">>, QQ, 'rabbit@burrow', 5000])). add_member_classic(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1231,7 +1231,7 @@ add_member_classic(Config) -> ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), ?assertEqual({error, classic_queue_not_supported}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, CQ, Server])). + [<<"/">>, CQ, Server, 5000])). add_member_already_a_member(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1242,14 +1242,14 @@ add_member_already_a_member(Config) -> %% idempotent by design ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server])). + [<<"/">>, QQ, Server, 5000])). add_member_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), QQ = ?config(queue_name, Config), ?assertEqual({error, not_found}, rpc:call(Server, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server])). + [<<"/">>, QQ, Server, 5000])). add_member(Config) -> [Server0, Server1] = Servers0 = @@ -1260,12 +1260,12 @@ add_member(Config) -> declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), ?assertEqual({error, node_not_running}, rpc:call(Server0, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server1])), + [<<"/">>, QQ, Server1, 5000])), ok = rabbit_control_helper:command(stop_app, Server1), ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), rabbit_control_helper:command(start_app, Server1), ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, - [<<"/">>, QQ, Server1])), + [<<"/">>, QQ, Server1, 5000])), Info = rpc:call(Server0, rabbit_quorum_queue, infos, [rabbit_misc:r(<<"/">>, queue, QQ)]), Servers = lists:sort(Servers0), |
