diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2018-04-18 16:02:26 +0100 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2018-04-18 16:24:20 +0100 |
| commit | 5f06ff470fcd4e395f4c66da4376f1f660172451 (patch) | |
| tree | f2cb00261c77713f9a83588440cb725e048721dd /src | |
| parent | 3b4af50b8886cf11a2d39b2cd0d92c0745dc9d84 (diff) | |
| download | rabbitmq-server-git-5f06ff470fcd4e395f4c66da4376f1f660172451.tar.gz | |
Introduce a new policy: ha-promote-on-failure (always by default)
This new policy controls if unsynchronised slaves should be promoted
after master crash. If set to `when-synced`, unsynchronised slaves
will not be promoted, keeping the state of the queue, but making it
unavailable until master node returns.
This change is supposed to make the cluster shutdown safier,
because queues can fail or be killed on shutdown.
The queues without master will be available from the management UI
and can be deleted and redeclared, but will not automatically loose
messages.
Trying to declare or passively declare the queue will result in a
timeout error. Same way as if the master was gracefully stopped with
ha-promote-on-shutdown: when-synced
[#156811690]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 40 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 126 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 12 |
4 files changed, 110 insertions, 75 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 562f0f0fcf..f07cf3e0fc 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids}, DeadPids), {stop, shutdown, State}; {error, not_found} -> - {stop, normal, State} + {stop, normal, State}; + {error, {not_synced, _}} -> + rabbit_log:error("Mirror queue ~p in unexpected state after gm_deaths." + " Promoted to master but already a master.", + [QueueName]), + error(unexpected_mirrored_state) end; handle_cast(request_depth, State = #state { depth_fun = DepthFun, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 8dd8a44c5b..a25b664bb0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ, stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) -> {ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName), - PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]], - ok = gm:broadcast(GM, {delete_and_terminate, Reason}), - %% It's possible that we could be partitioned from some slaves - %% between the lookup and the broadcast, in which case we could - %% monitor them but they would not have received the GM - %% message. So only wait for slaves which are still - %% not-partitioned. - PendingSlavePids = - lists:foldl( - fun({Pid, MRef}, Acc) -> - case rabbit_mnesia:on_running_node(Pid) of - true -> - receive - {'DOWN', MRef, process, _Pid, _Info} -> - Acc - after WT -> - rabbit_mirror_queue_misc:log_warning( - QName, "Missing 'DOWN' message from ~p in" - " node ~p~n", [Pid, node(Pid)]), - [Pid | Acc] - end; - false -> - Acc - end - end, [], PidsMRefs), - %% Normally when we remove a slave another slave or master will - %% notice and update Mnesia. But we just removed them all, and - %% have stopped listening ourselves. So manually clean up. - rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q] = mnesia:read({rabbit_queue, QName}), - rabbit_mirror_queue_misc:store_updated_slaves( - Q #amqqueue { gm_pids = [], slave_pids = [], - %% Restarted slaves on running nodes can - %% ensure old incarnations are stopped using - %% the pending slave pids. - slave_pids_pending_shutdown = PendingSlavePids}) - end), - ok = gm:forget_group(QName). + rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT). purge(State = #state { gm = GM, backing_queue = BQ, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 82169af7cb..16324abba0 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -24,6 +24,7 @@ update_mirrors/2, update_mirrors/1, validate_policy/1, maybe_auto_sync/1, maybe_drop_master_after_sync/1, sync_batch_size/1, log_info/3, log_warning/3]). +-export([stop_all_slaves/5]). -export([sync_queue/1, cancel_sync_queue/1]). @@ -47,6 +48,8 @@ [policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}}, {mfa, {rabbit_registry, register, [policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}}, + {mfa, {rabbit_registry, register, + [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}}, {requires, rabbit_registry}, {enables, recovery}]}). @@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, slave_pids = SPids, + sync_slave_pids = SyncSPids, gm_pids = GMPids }] -> {DeadGM, AliveGM} = lists:partition( fun ({GM, _}) -> @@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) -> {QPid, SPids}; _ -> promote_slave(Alive) end, - Extra = - case {{QPid, SPids}, {QPid1, SPids1}} of - {Same, Same} -> - []; - _ when QPid =:= QPid1 orelse QPid1 =:= Self -> - %% Either master hasn't changed, so - %% we're ok to update mnesia; or we have - %% become the master. If gm altered, - %% we have no choice but to proceed. - Q1 = Q#amqqueue{pid = QPid1, - slave_pids = SPids1, - gm_pids = AliveGM}, - store_updated_slaves(Q1), - %% If we add and remove nodes at the - %% same time we might tell the old - %% master we need to sync and then - %% shut it down. So let's check if - %% the new master needs to sync. - maybe_auto_sync(Q1), - slaves_to_start_on_failure(Q1, DeadGMPids); - _ -> - %% Master has changed, and we're not it. - %% [1]. - Q1 = Q#amqqueue{slave_pids = Alive, - gm_pids = AliveGM}, - store_updated_slaves(Q1), - [] - end, - {ok, QPid1, DeadPids, Extra} + DoNotPromote = SyncSPids =:= [] andalso + rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>, + case {{QPid, SPids}, {QPid1, SPids1}} of + {Same, Same} -> + {ok, QPid1, DeadPids, []}; + _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true -> + %% We have been promoted to master + %% but there are no synchronised mirrors + %% hence this node is not synchronised either + %% Bailing out. + {error, {not_synced, SPids1}}; + _ when QPid =:= QPid1 orelse QPid1 =:= Self -> + %% Either master hasn't changed, so + %% we're ok to update mnesia; or we have + %% become the master. If gm altered, + %% we have no choice but to proceed. + Q1 = Q#amqqueue{pid = QPid1, + slave_pids = SPids1, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + %% If we add and remove nodes at the + %% same time we might tell the old + %% master we need to sync and then + %% shut it down. So let's check if + %% the new master needs to sync. + maybe_auto_sync(Q1), + {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)}; + _ -> + %% Master has changed, and we're not it. + %% [1]. + Q1 = Q#amqqueue{slave_pids = Alive, + gm_pids = AliveGM}, + store_updated_slaves(Q1), + {ok, QPid1, DeadPids, []} + end end end). %% [1] We still update mnesia here in case the slave that is supposed @@ -305,6 +315,44 @@ update_recoverable(SPids, RS) -> DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave (RS -- DelNodes) ++ AddNodes. +stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) -> + PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]], + ok = gm:broadcast(GM, {delete_and_terminate, Reason}), + %% It's possible that we could be partitioned from some slaves + %% between the lookup and the broadcast, in which case we could + %% monitor them but they would not have received the GM + %% message. So only wait for slaves which are still + %% not-partitioned. + PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) -> + case rabbit_mnesia:on_running_node(Pid) of + true -> + receive + {'DOWN', MRef, process, _Pid, _Info} -> + Acc + after WaitTimeout -> + rabbit_mirror_queue_misc:log_warning( + QName, "Missing 'DOWN' message from ~p in" + " node ~p~n", [Pid, node(Pid)]), + [Pid | Acc] + end; + false -> + Acc + end + end, [], PidsMRefs), + %% Normally when we remove a slave another slave or master will + %% notice and update Mnesia. But we just removed them all, and + %% have stopped listening ourselves. So manually clean up. + rabbit_misc:execute_mnesia_transaction(fun () -> + [Q] = mnesia:read({rabbit_queue, QName}), + rabbit_mirror_queue_misc:store_updated_slaves( + Q #amqqueue { gm_pids = [], slave_pids = [], + %% Restarted slaves on running nodes can + %% ensure old incarnations are stopped using + %% the pending slave pids. + slave_pids_pending_shutdown = PendingSlavePids}) + end), + ok = gm:forget_group(QName). + %%---------------------------------------------------------------------------- promote_slave([SPid | SPids]) -> @@ -478,10 +526,12 @@ validate_policy(KeyList) -> <<"ha-sync-batch-size">>, KeyList, none), PromoteOnShutdown = proplists:get_value( <<"ha-promote-on-shutdown">>, KeyList, none), - case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of - {none, none, none, none, none} -> + PromoteOnFailure = proplists:get_value( + <<"ha-promote-on-failure">>, KeyList, none), + case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of + {none, none, none, none, none, none} -> ok; - {none, _, _, _, _} -> + {none, _, _, _, _, _} -> {error, "ha-mode must be specified to specify ha-params, " "ha-sync-mode or ha-promote-on-shutdown", []}; _ -> @@ -490,7 +540,8 @@ validate_policy(KeyList) -> {Params, ha_params_validator(Mode)}, {SyncMode, fun validate_sync_mode/1}, {SyncBatchSize, fun validate_sync_batch_size/1}, - {PromoteOnShutdown, fun validate_pos/1}]) + {PromoteOnShutdown, fun validate_pos/1}, + {PromoteOnFailure, fun validate_pof/1}]) end. ha_params_validator(Mode) -> @@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) -> Mode -> {error, "ha-promote-on-shutdown must be " "\"always\" or \"when-synced\", got ~p", [Mode]} end. + +validate_pof(PromoteOnShutdown) -> + case PromoteOnShutdown of + <<"always">> -> ok; + <<"when-synced">> -> ok; + none -> ok; + Mode -> {error, "ha-promote-on-failure must be " + "\"always\" or \"when-synced\", got ~p", [Mode]} + end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 65a13f03c0..ee8c7f9d26 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> end; handle_call({gm_deaths, DeadGMPids}, From, - State = #state { gm = GM, q = Q = #amqqueue { - name = QName, pid = MPid }}) -> + State = #state{ gm = GM, + q = Q = #amqqueue{ name = QName, pid = MPid }, + backing_queue = BQ, + backing_queue_state = BQS}) -> Self = self(), case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; + {error, {not_synced, SPids}} -> + WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000), + rabbit_mirror_queue_misc:stop_all_slaves( + {error, not_synced}, SPids, QName, GM, WaitTimeout), + BQ:delete_and_terminate({error, not_synced}, BQS), + {stop, normal, State#state{backing_queue_state = undefined}}; {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(Self, false, QName, DeadPids), |
