diff options
| author | Michael Klishin <michael@novemberain.com> | 2016-06-28 16:19:29 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2016-06-28 16:19:29 +0300 |
| commit | b655d4cef1a835aed5f8b7be6095a6ce4bf88529 (patch) | |
| tree | c8f7686184c79bfa23585d7dd24b05643166d01a | |
| parent | 16851f9ea5f727e6bf219def5816e394a8b7819c (diff) | |
| parent | e0051b72b1e6926ab8e023dec86d9b15833cc064 (diff) | |
| download | rabbitmq-server-git-b655d4cef1a835aed5f8b7be6095a6ce4bf88529.tar.gz | |
Merge pull request #865 from rabbitmq/rabbitmq-server-863
Stop pending slaves on restart
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 19 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 33 |
4 files changed, 98 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 9674a4ef2c..b5c5ffd418 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -214,16 +214,24 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> %% monitor them but they would not have received the GM %% message. So only wait for slaves which are still %% not-partitioned. - [receive - {'DOWN', MRef, process, _Pid, _Info} -> - ok - after WT -> - rabbit_mirror_queue_misc:log_warning( - QName, "Missing 'DOWN' message from ~p in node ~p~n", - [Pid, node(Pid)]), - ok - end - || {Pid, MRef} <- PidsMRefs, rabbit_mnesia:on_running_node(Pid)], + PendingSlavePids = + lists:foldl( + fun({Pid, MRef}, Acc) -> + case rabbit_mnesia:on_running_node(Pid) of + true -> + receive + {'DOWN', MRef, process, _Pid, _Info} -> + Acc + after WT -> + rabbit_mirror_queue_misc:log_warning( + QName, "Missing 'DOWN' message from ~p in" + " node ~p~n", [Pid, node(Pid)]), + [Pid | Acc] + end; + false -> + Acc + end + end, [], PidsMRefs), %% Normally when we remove a slave another slave or master will %% notice and update Mnesia. But we just removed them all, and %% have stopped listening ourselves. So manually clean up. @@ -231,7 +239,11 @@ stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> fun () -> [Q] = mnesia:read({rabbit_queue, QName}), rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { gm_pids = [], slave_pids = [] }) + Q #amqqueue { gm_pids = [], slave_pids = [], + %% Restarted slaves on running nodes can + %% ensure old incarnations are stopped using + %% the pending slave pids. + slave_pids_pending_shutdown = PendingSlavePids}) end), ok = gm:forget_group(QName). diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index c04c82f45e..d8aa3a7aad 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -163,9 +163,11 @@ handle_go(Q = #amqqueue{name = QName}) -> init_it(Self, GM, Node, QName) -> case mnesia:read({rabbit_queue, QName}) of - [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> + [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids, + slave_pids_pending_shutdown = PSPids}] -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of - [] -> add_slave(Q, Self, GM), + [] -> stop_pending_slaves(QName, PSPids), + add_slave(Q, Self, GM), {new, QPid, GMPids}; [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of true -> duplicate_live_master; @@ -186,6 +188,26 @@ init_it(Self, GM, Node, QName) -> master_in_recovery end. +%% Pending slaves have been asked to stop by the master, but despite the node +%% being up these did not answer on the expected timeout. Stop local slaves now. +stop_pending_slaves(QName, Pids) -> + [begin + rabbit_mirror_queue_misc:log_warning( + QName, "Detected stale HA slave, stopping it: ~p~n", [Pid]), + case erlang:process_info(Pid, dictionary) of + undefined -> ok; + {dictionary, Dict} -> + case proplists:get_value('$ancestors', Dict) of + [Sup, rabbit_amqqueue_sup_sup | _] -> + exit(Sup, kill), + exit(Pid, kill); + _ -> + ok + end + end + end || Pid <- Pids, node(Pid) =:= node(), + true =:= erlang:is_process_alive(Pid)]. + %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 add_slave(Q = #amqqueue { slave_pids = SPids, gm_pids = GMPids }, New, GM) -> diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 0f55b9e4a9..c6e739a487 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -53,6 +53,7 @@ -rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}). -rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}). -rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}). +-rabbit_upgrade({slave_pids_pending_shutdown, mnesia, [policy_version]}). -rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}). %% ------------------------------------------------------------------- @@ -466,6 +467,24 @@ policy_version(Table) -> sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, policy_version]). +slave_pids_pending_shutdown() -> + ok = slave_pids_pending_shutdown(rabbit_queue), + ok = slave_pids_pending_shutdown(rabbit_durable_queue). + +slave_pids_pending_shutdown(Table) -> + transform( + Table, + fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion}) -> + {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments, + Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators, + State, PolicyVersion, []} + end, + [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids, + sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state, + policy_version, slave_pids_pending_shutdown]). + %% Prior to 3.6.0, passwords were hashed using MD5, this populates %% existing records with said default. Users created with 3.6.0+ will %% have internal_user.hashing_algorithm populated by the internal diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 46fafd89f7..39ed3af69a 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -49,7 +49,8 @@ groups() -> {cluster_size_3, [], [ mirror_queue_auto_ack, mirror_fast_reset_policy, - mirror_reset_policy + mirror_reset_policy, + mirror_stop_pending_slaves ]} ]. @@ -523,6 +524,36 @@ mirror_reset_policy(Config, Wait) -> rabbit_ct_client_helpers:close_connection(Conn), passed. +mirror_stop_pending_slaves(Config) -> + A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), + C = rabbit_ct_broker_helpers:get_node_config(Config, 2, nodename), + + [ok = rabbit_ct_broker_helpers:rpc( + Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 0]) || Nodename <- [A, B, C]], + + {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, A), + Q = <<"mirror_stop_pending_slaves-queue">>, + declare(Ch, Q, 5), + publish_many(Ch, Q, 20000), + + [begin + rabbit_ct_broker_helpers:set_ha_policy( + Config, A, <<"^mirror_stop_pending_slaves-queue$">>, <<"all">>, + [{<<"ha-sync-mode">>, <<"automatic">>}]), + wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 2), + rabbit_ct_broker_helpers:clear_policy( + Config, A, <<"^mirror_stop_pending_slaves-queue$">>) + end || _ <- lists:seq(1, 15)], + + delete(Ch, Q), + + [ok = rabbit_ct_broker_helpers:rpc( + Config, Nodename, application, set_env, [rabbit, slave_wait_timeout, 15000]) || Nodename <- [A, B, C]], + + rabbit_ct_client_helpers:close_connection(Conn), + passed. + %%---------------------------------------------------------------------------- declare(Ch, Q, Args) when is_list(Args) -> |
