diff options
| author | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-22 15:02:31 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana.corbacho@erlang-solutions.com> | 2016-06-28 11:47:21 +0100 |
| commit | 6fed347626931ef12a04aba0ac0e5d47d59bc24f (patch) | |
| tree | 7c36a34120eb8e71b8c2e74cc6f819730ab8b076 /src | |
| parent | 16851f9ea5f727e6bf219def5816e394a8b7819c (diff) | |
| download | rabbitmq-server-git-6fed347626931ef12a04aba0ac0e5d47d59bc24f.tar.gz | |
Ensure old incarnations of slaves are stopped before new ones start
* Solves race condition between master asking to stop and the restart
of the queues.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 |
3 files changed, 66 insertions, 13 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9674a4ef2c..b5c5ffd418 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -214,16 +214,24 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> %% monitor them but they would not have received the GM %% message. So only wait for slaves which are still %% not-partitioned. - [receive - {'DOWN', MRef, process, _Pid, _Info} -> - ok - after WT -> - rabbit_mirror_queue_misc:log_warning( - QName, "Missing 'DOWN' message from ~p in node ~p~n", - [Pid, node(Pid)]), - ok - end - || {Pid, MRef} <- PidsMRefs, rabbit_mnesia:on_running_node(Pid)], + PendingSlavePids = + lists:foldl( + fun({Pid, MRef}, Acc) -> + case rabbit_mnesia:on_running_node(Pid) of + true -> + receive + {'DOWN', MRef, process, _Pid, _Info} -> + Acc + after WT -> + rabbit_mirror_queue_misc:log_warning( + QName, "Missing 'DOWN' message from ~p in" + " node ~p~n", [Pid, node(Pid)]), + [Pid | Acc] + end; + false -> + Acc + end + end, [], PidsMRefs), %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. @@ -231,7 +239,11 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> fun () -> [Q] = mnesia:read({rabbit_queue, QName}), rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { gm_pids = [], slave_pids = [] }) + Q #amqqueue { gm_pids = [], slave_pids = [], + %% Restarted slaves on running nodes can + %% ensure old incarnations are stopped using + %% the pending slave pids. + slave_pids_pending_shutdown = PendingSlavePids}) end), ok = gm:forget_group(QName). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c04c82f45e..d8aa3a7aad 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -163,9 +163,11 @@ handle_go(Q = #amqqueue{name = QName}) -> init_it(Self, GM, Node, QName) -> case mnesia:read({rabbit_queue, QName}) of - [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids, + slave_pids_pending_shutdown = PSPids}] -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of - [] -> add_slave(Q, Self, GM), + [] -> stop_pending_slaves(QName, PSPids), + add_slave(Q, Self, GM), {new, QPid, GMPids}; [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of true -> duplicate_live_master; @@ -186,6 +188,26 @@ init_it(Self, GM, Node, QName) -> master_in_recovery end. +%% Pending slaves have been asked to stop by the master, but despite the node +%% being up these did not answer on the expected timeout. Stop local slaves now. +stop_pending_slaves(QName, Pids) -> + [begin + rabbit_mirror_queue_misc:log_warning( + QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]), + case erlang:process_info(Pid, dictionary) of + undefined -> ok; + {dictionary, Dict} -> + case proplists:get_value('$ancestors', Dict) of + [Sup, rabbit_amqqueue_sup_sup | _] -> + exit(Sup, kill), + exit(Pid, kill); + _ -> + ok + end + end + end || Pid <- Pids, node(Pid) =:= node(), + true =:= erlang:is_process_alive(Pid)]. + %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 0f55b9e4a9..c6e739a487 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -53,6 +53,7 @@ -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). +-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -466,6 +467,24 @@ policy_version(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, policy_version]). +slave_pids_pending_shutdown() -> + ok = slave_pids_pending_shutdown(rabbit_queue), + ok = slave_pids_pending_shutdown(rabbit_durable_queue). + +slave_pids_pending_shutdown(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion, []} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version, slave_pids_pending_shutdown]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal |
