summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2016-06-28 16:19:29 +0300
committerGitHub <noreply@github.com>2016-06-28 16:19:29 +0300
commitb655d4cef1a835aed5f8b7be6095a6ce4bf88529 (patch)
treec8f7686184c79bfa23585d7dd24b05643166d01a
parent16851f9ea5f727e6bf219def5816e394a8b7819c (diff)
parente0051b72b1e6926ab8e023dec86d9b15833cc064 (diff)
downloadrabbitmq-server-git-b655d4cef1a835aed5f8b7be6095a6ce4bf88529.tar.gz
Merge pull request #865 from rabbitmq/rabbitmq-server-863
Stop pending slaves on restart
-rw-r--r--src/rabbit_mirror_queue_master.erl34
-rw-r--r--src/rabbit_mirror_queue_slave.erl26
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--test/priority_queue_SUITE.erl33
4 files changed, 98 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9674a4ef2c..b5c5ffd418 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -214,16 +214,24 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
%% monitor them but they would not have received the GM
%% message. So only wait for slaves which are still
%% not-partitioned.
- [receive
- {'DOWN', MRef, process, _Pid, _Info} ->
- ok
- after WT ->
- rabbit_mirror_queue_misc:log_warning(
- QName, "Missing 'DOWN' message from ~p in node ~p~n",
- [Pid, node(Pid)]),
- ok
- end
- || {Pid, MRef} <- PidsMRefs, rabbit_mnesia:on_running_node(Pid)],
+ PendingSlavePids =
+ lists:foldl(
+ fun({Pid, MRef}, Acc) ->
+ case rabbit_mnesia:on_running_node(Pid) of
+ true ->
+ receive
+ {'DOWN', MRef, process, _Pid, _Info} ->
+ Acc
+ after WT ->
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Missing 'DOWN' message from ~p in"
+ " node ~p~n", [Pid, node(Pid)]),
+ [Pid | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end, [], PidsMRefs),
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
@@ -231,7 +239,11 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [] })
+ Q #amqqueue { gm_pids = [], slave_pids = [],
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ slave_pids_pending_shutdown = PendingSlavePids})
end),
ok = gm:forget_group(QName).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c04c82f45e..d8aa3a7aad 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -163,9 +163,11 @@ handle_go(Q = #amqqueue{name = QName}) ->
init_it(Self, GM, Node, QName) ->
case mnesia:read({rabbit_queue, QName}) of
- [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids,
+ slave_pids_pending_shutdown = PSPids}] ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
- [] -> add_slave(Q, Self, GM),
+ [] -> stop_pending_slaves(QName, PSPids),
+ add_slave(Q, Self, GM),
{new, QPid, GMPids};
[QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -186,6 +188,26 @@ init_it(Self, GM, Node, QName) ->
master_in_recovery
end.
+%% Pending slaves have been asked to stop by the master, but despite the node
+%% being up these did not answer on the expected timeout. Stop local slaves now.
+stop_pending_slaves(QName, Pids) ->
+ [begin
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
+ case erlang:process_info(Pid, dictionary) of
+ undefined -> ok;
+ {dictionary, Dict} ->
+ case proplists:get_value('$ancestors', Dict) of
+ [Sup, rabbit_amqqueue_sup_sup | _] ->
+ exit(Sup, kill),
+ exit(Pid, kill);
+ _ ->
+ ok
+ end
+ end
+ end || Pid <- Pids, node(Pid) =:= node(),
+ true =:= erlang:is_process_alive(Pid)].
+
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 0f55b9e4a9..c6e739a487 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -53,6 +53,7 @@
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
+-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -466,6 +467,24 @@ policy_version(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
policy_version]).
+slave_pids_pending_shutdown() ->
+ ok = slave_pids_pending_shutdown(rabbit_queue),
+ ok = slave_pids_pending_shutdown(rabbit_durable_queue).
+
+slave_pids_pending_shutdown(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version, slave_pids_pending_shutdown]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 46fafd89f7..39ed3af69a 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -49,7 +49,8 @@ groups() ->
{cluster_size_3, [], [
mirror_queue_auto_ack,
mirror_fast_reset_policy,
- mirror_reset_policy
+ mirror_reset_policy,
+ mirror_stop_pending_slaves
]}
].
@@ -523,6 +524,36 @@ mirror_reset_policy(Config, Wait) ->
rabbit_ct_client_helpers:close_connection(Conn),
passed.
+mirror_stop_pending_slaves(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ C = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename),
+
+ [ok = rabbit_ct_broker_helpers:rpc(
+ Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 0]) || Nodename <- [A, B, C]],
+
+ {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A),
+ Q = <<"mirror_stop_pending_slaves-queue">>,
+ declare(Ch, Q, 5),
+ publish_many(Ch, Q, 20000),
+
+ [begin
+ rabbit_ct_broker_helpers:set_ha_policy(
+ Config, A, <<"^mirror_stop_pending_slaves-queue$">>, <<"all">>,
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2),
+ rabbit_ct_broker_helpers:clear_policy(
+ Config, A, <<"^mirror_stop_pending_slaves-queue$">>)
+ end || _ <- lists:seq(1, 15)],
+
+ delete(Ch, Q),
+
+ [ok = rabbit_ct_broker_helpers:rpc(
+ Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 15000]) || Nodename <- [A, B, C]],
+
+ rabbit_ct_client_helpers:close_connection(Conn),
+ passed.
+
%%----------------------------------------------------------------------------
declare(Ch, Q, Args) when is_list(Args) ->