summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-22 15:02:31 +0100
committerDiana Corbacho <diana.corbacho@erlang-solutions.com>2016-06-28 11:47:21 +0100
commit6fed347626931ef12a04aba0ac0e5d47d59bc24f (patch)
tree7c36a34120eb8e71b8c2e74cc6f819730ab8b076 /src
parent16851f9ea5f727e6bf219def5816e394a8b7819c (diff)
downloadrabbitmq-server-git-6fed347626931ef12a04aba0ac0e5d47d59bc24f.tar.gz
Ensure old incarnations of slaves are stopped before new ones start
* Solves race condition between master asking to stop and the restart of the queues.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_master.erl34
-rw-r--r--src/rabbit_mirror_queue_slave.erl26
-rw-r--r--src/rabbit_upgrade_functions.erl19
3 files changed, 66 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 9674a4ef2c..b5c5ffd418 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -214,16 +214,24 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
%% monitor them but they would not have received the GM
%% message. So only wait for slaves which are still
%% not-partitioned.
- [receive
- {'DOWN', MRef, process, _Pid, _Info} ->
- ok
- after WT ->
- rabbit_mirror_queue_misc:log_warning(
- QName, "Missing 'DOWN' message from ~p in node ~p~n",
- [Pid, node(Pid)]),
- ok
- end
- || {Pid, MRef} <- PidsMRefs, rabbit_mnesia:on_running_node(Pid)],
+ PendingSlavePids =
+ lists:foldl(
+ fun({Pid, MRef}, Acc) ->
+ case rabbit_mnesia:on_running_node(Pid) of
+ true ->
+ receive
+ {'DOWN', MRef, process, _Pid, _Info} ->
+ Acc
+ after WT ->
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Missing 'DOWN' message from ~p in"
+ " node ~p~n", [Pid, node(Pid)]),
+ [Pid | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end, [], PidsMRefs),
%% Normally when we remove a slave another slave or master will
%% notice and update Mnesia. But we just removed them all, and
%% have stopped listening ourselves. So manually clean up.
@@ -231,7 +239,11 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
fun () ->
[Q] = mnesia:read({rabbit_queue, QName}),
rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [] })
+ Q #amqqueue { gm_pids = [], slave_pids = [],
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ slave_pids_pending_shutdown = PendingSlavePids})
end),
ok = gm:forget_group(QName).
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index c04c82f45e..d8aa3a7aad 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -163,9 +163,11 @@ handle_go(Q = #amqqueue{name = QName}) ->
init_it(Self, GM, Node, QName) ->
case mnesia:read({rabbit_queue, QName}) of
- [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] ->
+ [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids,
+ slave_pids_pending_shutdown = PSPids}] ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
- [] -> add_slave(Q, Self, GM),
+ [] -> stop_pending_slaves(QName, PSPids),
+ add_slave(Q, Self, GM),
{new, QPid, GMPids};
[QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -186,6 +188,26 @@ init_it(Self, GM, Node, QName) ->
master_in_recovery
end.
+%% Pending slaves have been asked to stop by the master, but despite the node
+%% being up these did not answer on the expected timeout. Stop local slaves now.
+stop_pending_slaves(QName, Pids) ->
+ [begin
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]),
+ case erlang:process_info(Pid, dictionary) of
+ undefined -> ok;
+ {dictionary, Dict} ->
+ case proplists:get_value('$ancestors', Dict) of
+ [Sup, rabbit_amqqueue_sup_sup | _] ->
+ exit(Sup, kill),
+ exit(Pid, kill);
+ _ ->
+ ok
+ end
+ end
+ end || Pid <- Pids, node(Pid) =:= node(),
+ true =:= erlang:is_process_alive(Pid)].
+
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) ->
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 0f55b9e4a9..c6e739a487 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -53,6 +53,7 @@
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
+-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -466,6 +467,24 @@ policy_version(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
policy_version]).
+slave_pids_pending_shutdown() ->
+ ok = slave_pids_pending_shutdown(rabbit_queue),
+ ok = slave_pids_pending_shutdown(rabbit_durable_queue).
+
+slave_pids_pending_shutdown(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, PolicyVersion, []}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version, slave_pids_pending_shutdown]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal