diff options
| author | Michael Klishin <michael@novemberain.com> | 2018-04-25 13:37:59 +0200 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-04-25 13:37:59 +0200 |
| commit | 8aea4b8484c7e787cd6b8cc8841609e573cfccd7 (patch) | |
| tree | 610fd291398f5c0cc956aa9f6fa1d3bf7a0b3065 | |
| parent | 6429f808ec2cce74c2ad1ee1094807cd9b983e54 (diff) | |
| parent | c3af3ff1453206a5979fcca8a4cca9336f6bbbae (diff) | |
| download | rabbitmq-server-git-8aea4b8484c7e787cd6b8cc8841609e573cfccd7.tar.gz | |
Merge pull request #1578 from rabbitmq/queue-not-promote-on-crash
Policy key to not promote unsynchronised queues.
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 126 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 12 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 51 |
5 files changed, 160 insertions, 76 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 562f0f0fcf..c6e23bf21f 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids}, DeadPids), {stop, shutdown, State}; {error, not_found} -> - {stop, normal, State} + {stop, normal, State}; + {error, {not_synced, _}} -> + rabbit_log:error("Mirror queue ~p in unexpected state." + " Promoted to master but already a master.", + [QueueName]), + error(unexpected_mirrored_state) end; handle_cast(request_depth, State = #state { depth_fun = DepthFun, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8dd8a44c5b..a25b664bb0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), - PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]], - ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - %% It's possible that we could be partitioned from some slaves - %% between the lookup and the broadcast, in which case we could - %% monitor them but they would not have received the GM - %% message. So only wait for slaves which are still - %% not-partitioned. - PendingSlavePids = - lists:foldl( - fun({Pid, MRef}, Acc) -> - case rabbit_mnesia:on_running_node(Pid) of - true -> - receive - {'DOWN', MRef, process, _Pid, _Info} -> - Acc - after WT -> - rabbit_mirror_queue_misc:log_warning( - QName, "Missing 'DOWN' message from ~p in" - " node ~p~n", [Pid, node(Pid)]), - [Pid | Acc] - end; - false -> - Acc - end - end, [], PidsMRefs), - %% Normally when we remove a slave another slave or master will - %% notice and update Mnesia. But we just removed them all, and - %% have stopped listening ourselves. So manually clean up. - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q] = mnesia:read({rabbit_queue, QName}), - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { gm_pids = [], slave_pids = [], - %% Restarted slaves on running nodes can - %% ensure old incarnations are stopped using - %% the pending slave pids. - slave_pids_pending_shutdown = PendingSlavePids}) - end), - ok = gm:forget_group(QName). + rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT). purge(State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 82169af7cb..16324abba0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -24,6 +24,7 @@ update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). +-export([stop_all_slaves/5]). -export([sync_queue/1, cancel_sync_queue/1]). @@ -47,6 +48,8 @@ [policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids, + sync_slave_pids = SyncSPids, gm_pids = GMPids }] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> @@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> {QPid, SPids}; _ -> promote_slave(Alive) end, - Extra = - case {{QPid, SPids}, {QPid1, SPids1}} of - {Same, Same} -> - []; - _ when QPid =:= QPid1 orelse QPid1 =:= Self -> - %% Either master hasn't changed, so - %% we're ok to update mnesia; or we have - %% become the master. If gm altered, - %% we have no choice but to proceed. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, - store_updated_slaves(Q1), - %% If we add and remove nodes at the - %% same time we might tell the old - %% master we need to sync and then - %% shut it down. So let's check if - %% the new master needs to sync. - maybe_auto_sync(Q1), - slaves_to_start_on_failure(Q1, DeadGMPids); - _ -> - %% Master has changed, and we're not it. - %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, - store_updated_slaves(Q1), - [] - end, - {ok, QPid1, DeadPids, Extra} + DoNotPromote = SyncSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>, + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + {ok, QPid1, DeadPids, []}; + _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true -> + %% We have been promoted to master + %% but there are no synchronised mirrors + %% hence this node is not synchronised either + %% Bailing out. + {error, {not_synced, SPids1}}; + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. If gm altered, + %% we have no choice but to proceed. + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the + %% same time we might tell the old + %% master we need to sync and then + %% shut it down. So let's check if + %% the new master needs to sync. + maybe_auto_sync(Q1), + {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)}; + _ -> + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + {ok, QPid1, DeadPids, []} + end end end). %% [1] We still update mnesia here in case the slave that is supposed @@ -305,6 +315,44 @@ update_recoverable(SPids, RS) -> DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave (RS -- DelNodes) ++ AddNodes. +stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) -> + PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]], + ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + %% It's possible that we could be partitioned from some slaves + %% between the lookup and the broadcast, in which case we could + %% monitor them but they would not have received the GM + %% message. So only wait for slaves which are still + %% not-partitioned. + PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) -> + case rabbit_mnesia:on_running_node(Pid) of + true -> + receive + {'DOWN', MRef, process, _Pid, _Info} -> + Acc + after WaitTimeout -> + rabbit_mirror_queue_misc:log_warning( + QName, "Missing 'DOWN' message from ~p in" + " node ~p~n", [Pid, node(Pid)]), + [Pid | Acc] + end; + false -> + Acc + end + end, [], PidsMRefs), + %% Normally when we remove a slave another slave or master will + %% notice and update Mnesia. But we just removed them all, and + %% have stopped listening ourselves. So manually clean up. + rabbit_misc:execute_mnesia_transaction(fun () -> + [Q] = mnesia:read({rabbit_queue, QName}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { gm_pids = [], slave_pids = [], + %% Restarted slaves on running nodes can + %% ensure old incarnations are stopped using + %% the pending slave pids. + slave_pids_pending_shutdown = PendingSlavePids}) + end), + ok = gm:forget_group(QName). + %%---------------------------------------------------------------------------- promote_slave([SPid | SPids]) -> @@ -478,10 +526,12 @@ validate_policy(KeyList) -> <<"ha-sync-batch-size">>, KeyList, none), PromoteOnShutdown = proplists:get_value( <<"ha-promote-on-shutdown">>, KeyList, none), - case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of - {none, none, none, none, none} -> + PromoteOnFailure = proplists:get_value( + <<"ha-promote-on-failure">>, KeyList, none), + case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of + {none, none, none, none, none, none} -> ok; - {none, _, _, _, _} -> + {none, _, _, _, _, _} -> {error, "ha-mode must be specified to specify ha-params, " "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> @@ -490,7 +540,8 @@ validate_policy(KeyList) -> {Params, ha_params_validator(Mode)}, {SyncMode, fun validate_sync_mode/1}, {SyncBatchSize, fun validate_sync_batch_size/1}, - {PromoteOnShutdown, fun validate_pos/1}]) + {PromoteOnShutdown, fun validate_pos/1}, + {PromoteOnFailure, fun validate_pof/1}]) end. ha_params_validator(Mode) -> @@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) -> Mode -> {error, "ha-promote-on-shutdown must be " "\"always\" or \"when-synced\", got ~p", [Mode]} end. + +validate_pof(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-failure must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 65a13f03c0..ee8c7f9d26 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> end; handle_call({gm_deaths, DeadGMPids}, From, - State = #state { gm = GM, q = Q = #amqqueue { - name = QName, pid = MPid }}) -> + State = #state{ gm = GM, + q = Q = #amqqueue{ name = QName, pid = MPid }, + backing_queue = BQ, + backing_queue_state = BQS}) -> Self = self(), case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; + {error, {not_synced, SPids}} -> + WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000), + rabbit_mirror_queue_misc:stop_all_slaves( + {error, not_synced}, SPids, QName, GM, WaitTimeout), + BQ:delete_and_terminate({error, not_synced}, BQS), + {stop, normal, State#state{backing_queue_state = undefined}}; {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 7629d2264a..1d455bb290 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -59,6 +59,7 @@ groups() -> vhost_deletion, force_delete_if_no_master, promote_on_shutdown, + promote_on_failure, slave_recovers_after_vhost_failure, slave_recovers_after_vhost_down_an_up, master_migrates_on_vhost_down, @@ -287,22 +288,61 @@ force_delete_if_no_master(Config) -> amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}), ok. +promote_on_failure(Config) -> + [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>, + <<"all">>, [{<<"ha-promote-on-failure">>, <<"always">>}]), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>, + <<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>}]), + + ACh = rabbit_ct_client_helpers:open_channel(Config, A), + [begin + amqp_channel:call(ACh, #'queue.declare'{queue = Q, + durable = true}), + rabbit_ct_client_helpers:publish(ACh, Q, 10) + end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]], + ok = rabbit_ct_broker_helpers:restart_node(Config, B), + ok = rabbit_ct_broker_helpers:kill_node(Config, A), + BCh = rabbit_ct_client_helpers:open_channel(Config, B), + #'queue.declare_ok'{message_count = 0} = + amqp_channel:call( + BCh, #'queue.declare'{queue = <<"ha.promote.test">>, + durable = true}), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>, + durable = true})), + ok = rabbit_ct_broker_helpers:start_node(Config, A), + ACh2 = rabbit_ct_client_helpers:open_channel(Config, A), + #'queue.declare_ok'{message_count = 10} = + amqp_channel:call( + ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>, + durable = true}), + ok. + promote_on_shutdown(Config) -> [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>, <<"all">>, [{<<"ha-promote-on-shutdown">>, <<"always">>}]), rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>, <<"all">>), + rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromoteonfailure">>, + <<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>}, + {<<"ha-promote-on-shutdown">>, <<"always">>}]), ACh = rabbit_ct_client_helpers:open_channel(Config, A), [begin amqp_channel:call(ACh, #'queue.declare'{queue = Q, durable = true}), rabbit_ct_client_helpers:publish(ACh, Q, 10) - end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]], + end || Q <- [<<"ha.promote.test">>, + <<"ha.nopromote.test">>, + <<"ha.nopromoteonfailure.test">>]], ok = rabbit_ct_broker_helpers:restart_node(Config, B), ok = rabbit_ct_broker_helpers:stop_node(Config, A), BCh = rabbit_ct_client_helpers:open_channel(Config, B), + BCh1 = rabbit_ct_client_helpers:open_channel(Config, B), #'queue.declare_ok'{message_count = 0} = amqp_channel:call( BCh, #'queue.declare'{queue = <<"ha.promote.test">>, @@ -312,12 +352,21 @@ promote_on_shutdown(Config) -> amqp_channel:call( BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>, durable = true})), + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call( + BCh1, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>, + durable = true})), ok = rabbit_ct_broker_helpers:start_node(Config, A), ACh2 = rabbit_ct_client_helpers:open_channel(Config, A), #'queue.declare_ok'{message_count = 10} = amqp_channel:call( ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>, durable = true}), + #'queue.declare_ok'{message_count = 10} = + amqp_channel:call( + ACh2, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>, + durable = true}), ok. nodes_policy_should_pick_master_from_its_params(Config) -> |
