summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2018-04-18 16:02:26 +0100
committerDaniil Fedotov <hairyhum@gmail.com>2018-04-18 16:24:20 +0100
commit5f06ff470fcd4e395f4c66da4376f1f660172451 (patch)
treef2cb00261c77713f9a83588440cb725e048721dd /src
parent3b4af50b8886cf11a2d39b2cd0d92c0745dc9d84 (diff)
downloadrabbitmq-server-git-5f06ff470fcd4e395f4c66da4376f1f660172451.tar.gz
Introduce a new policy: ha-promote-on-failure (always by default)
This new policy controls if unsynchronised slaves should be promoted after master crash. If set to `when-synced`, unsynchronised slaves will not be promoted, keeping the state of the queue, but making it unavailable until master node returns. This change is supposed to make the cluster shutdown safier, because queues can fail or be killed on shutdown. The queues without master will be available from the management UI and can be deleted and redeclared, but will not automatically loose messages. Trying to declare or passively declare the queue will result in a timeout error. Same way as if the master was gracefully stopped with ha-promote-on-shutdown: when-synced [#156811690]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl40
-rw-r--r--src/rabbit_mirror_queue_misc.erl126
-rw-r--r--src/rabbit_mirror_queue_slave.erl12
4 files changed, 110 insertions, 75 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 562f0f0fcf..f07cf3e0fc 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
{stop, shutdown, State};
{error, not_found} ->
- {stop, normal, State}
+ {stop, normal, State};
+ {error, {not_synced, _}} ->
+ rabbit_log:error("Mirror queue ~p in unexpected state after gm_deaths."
+ " Promoted to master but already a master.",
+ [QueueName]),
+ error(unexpected_mirrored_state)
end;
handle_cast(request_depth, State = #state { depth_fun = DepthFun,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8dd8a44c5b..a25b664bb0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
- PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
- ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
- %% It's possible that we could be partitioned from some slaves
- %% between the lookup and the broadcast, in which case we could
- %% monitor them but they would not have received the GM
- %% message. So only wait for slaves which are still
- %% not-partitioned.
- PendingSlavePids =
- lists:foldl(
- fun({Pid, MRef}, Acc) ->
- case rabbit_mnesia:on_running_node(Pid) of
- true ->
- receive
- {'DOWN', MRef, process, _Pid, _Info} ->
- Acc
- after WT ->
- rabbit_mirror_queue_misc:log_warning(
- QName, "Missing 'DOWN' message from ~p in"
- " node ~p~n", [Pid, node(Pid)]),
- [Pid | Acc]
- end;
- false ->
- Acc
- end
- end, [], PidsMRefs),
- %% Normally when we remove a slave another slave or master will
- %% notice and update Mnesia. But we just removed them all, and
- %% have stopped listening ourselves. So manually clean up.
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q] = mnesia:read({rabbit_queue, QName}),
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [],
- %% Restarted slaves on running nodes can
- %% ensure old incarnations are stopped using
- %% the pending slave pids.
- slave_pids_pending_shutdown = PendingSlavePids})
- end),
- ok = gm:forget_group(QName).
+ rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 82169af7cb..16324abba0 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -24,6 +24,7 @@
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
+-export([stop_all_slaves/5]).
-export([sync_queue/1, cancel_sync_queue/1]).
@@ -47,6 +48,8 @@
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
+ sync_slave_pids = SyncSPids,
gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
{QPid, SPids};
_ -> promote_slave(Alive)
end,
- Extra =
- case {{QPid, SPids}, {QPid1, SPids1}} of
- {Same, Same} ->
- [];
- _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
- %% Either master hasn't changed, so
- %% we're ok to update mnesia; or we have
- %% become the master. If gm altered,
- %% we have no choice but to proceed.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- %% If we add and remove nodes at the
- %% same time we might tell the old
- %% master we need to sync and then
- %% shut it down. So let's check if
- %% the new master needs to sync.
- maybe_auto_sync(Q1),
- slaves_to_start_on_failure(Q1, DeadGMPids);
- _ ->
- %% Master has changed, and we're not it.
- %% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- []
- end,
- {ok, QPid1, DeadPids, Extra}
+ DoNotPromote = SyncSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
+ case {{QPid, SPids}, {QPid1, SPids1}} of
+ {Same, Same} ->
+ {ok, QPid1, DeadPids, []};
+ _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
+ %% We have been promoted to master
+ %% but there are no synchronised mirrors
+ %% hence this node is not synchronised either
+ %% Bailing out.
+ {error, {not_synced, SPids1}};
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
+ %% Either master hasn't changed, so
+ %% we're ok to update mnesia; or we have
+ %% become the master. If gm altered,
+ %% we have no choice but to proceed.
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ %% If we add and remove nodes at the
+ %% same time we might tell the old
+ %% master we need to sync and then
+ %% shut it down. So let's check if
+ %% the new master needs to sync.
+ maybe_auto_sync(Q1),
+ {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
+ _ ->
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ {ok, QPid1, DeadPids, []}
+ end
end
end).
%% [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.
+stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
+ PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ %% It's possible that we could be partitioned from some slaves
+ %% between the lookup and the broadcast, in which case we could
+ %% monitor them but they would not have received the GM
+ %% message. So only wait for slaves which are still
+ %% not-partitioned.
+ PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
+ case rabbit_mnesia:on_running_node(Pid) of
+ true ->
+ receive
+ {'DOWN', MRef, process, _Pid, _Info} ->
+ Acc
+ after WaitTimeout ->
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Missing 'DOWN' message from ~p in"
+ " node ~p~n", [Pid, node(Pid)]),
+ [Pid | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end, [], PidsMRefs),
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { gm_pids = [], slave_pids = [],
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ slave_pids_pending_shutdown = PendingSlavePids})
+ end),
+ ok = gm:forget_group(QName).
+
%%----------------------------------------------------------------------------
promote_slave([SPid | SPids]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
- case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
- {none, none, none, none, none} ->
+ PromoteOnFailure = proplists:get_value(
+ <<"ha-promote-on-failure">>, KeyList, none),
+ case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
+ {none, none, none, none, none, none} ->
ok;
- {none, _, _, _, _} ->
+ {none, _, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
- {PromoteOnShutdown, fun validate_pos/1}])
+ {PromoteOnShutdown, fun validate_pos/1},
+ {PromoteOnFailure, fun validate_pof/1}])
end.
ha_params_validator(Mode) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
Mode -> {error, "ha-promote-on-shutdown must be "
"\"always\" or \"when-synced\", got ~p", [Mode]}
end.
+
+validate_pof(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-failure must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 65a13f03c0..ee8c7f9d26 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
end;
handle_call({gm_deaths, DeadGMPids}, From,
- State = #state { gm = GM, q = Q = #amqqueue {
- name = QName, pid = MPid }}) ->
+ State = #state{ gm = GM,
+ q = Q = #amqqueue{ name = QName, pid = MPid },
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
+ {error, {not_synced, SPids}} ->
+ WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
+ rabbit_mirror_queue_misc:stop_all_slaves(
+ {error, not_synced}, SPids, QName, GM, WaitTimeout),
+ BQ:delete_and_terminate({error, not_synced}, BQS),
+ {stop, normal, State#state{backing_queue_state = undefined}};
{ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),