summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl40
-rw-r--r--src/rabbit_mirror_queue_misc.erl126
-rw-r--r--src/rabbit_mirror_queue_slave.erl12
4 files changed, 110 insertions, 75 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 562f0f0fcf..f07cf3e0fc 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
{stop, shutdown, State};
{error, not_found} ->
- {stop, normal, State}
+ {stop, normal, State};
+ {error, {not_synced, _}} ->
+ rabbit_log:error("Mirror queue ~p in unexpected state after gm_deaths."
+ " Promoted to master but already a master.",
+ [QueueName]),
+ error(unexpected_mirrored_state)
end;
handle_cast(request_depth, State = #state { depth_fun = DepthFun,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8dd8a44c5b..a25b664bb0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
- PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
- ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
- %% It's possible that we could be partitioned from some slaves
- %% between the lookup and the broadcast, in which case we could
- %% monitor them but they would not have received the GM
- %% message. So only wait for slaves which are still
- %% not-partitioned.
- PendingSlavePids =
- lists:foldl(
- fun({Pid, MRef}, Acc) ->
- case rabbit_mnesia:on_running_node(Pid) of
- true ->
- receive
- {'DOWN', MRef, process, _Pid, _Info} ->
- Acc
- after WT ->
- rabbit_mirror_queue_misc:log_warning(
- QName, "Missing 'DOWN' message from ~p in"
- " node ~p~n", [Pid, node(Pid)]),
- [Pid | Acc]
- end;
- false ->
- Acc
- end
- end, [], PidsMRefs),
- %% Normally when we remove a slave another slave or master will
- %% notice and update Mnesia. But we just removed them all, and
- %% have stopped listening ourselves. So manually clean up.
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q] = mnesia:read({rabbit_queue, QName}),
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [],
- %% Restarted slaves on running nodes can
- %% ensure old incarnations are stopped using
- %% the pending slave pids.
- slave_pids_pending_shutdown = PendingSlavePids})
- end),
- ok = gm:forget_group(QName).
+ rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 82169af7cb..16324abba0 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -24,6 +24,7 @@
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
+-export([stop_all_slaves/5]).
-export([sync_queue/1, cancel_sync_queue/1]).
@@ -47,6 +48,8 @@
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
+ sync_slave_pids = SyncSPids,
gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
{QPid, SPids};
_ -> promote_slave(Alive)
end,
- Extra =
- case {{QPid, SPids}, {QPid1, SPids1}} of
- {Same, Same} ->
- [];
- _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
- %% Either master hasn't changed, so
- %% we're ok to update mnesia; or we have
- %% become the master. If gm altered,
- %% we have no choice but to proceed.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- %% If we add and remove nodes at the
- %% same time we might tell the old
- %% master we need to sync and then
- %% shut it down. So let's check if
- %% the new master needs to sync.
- maybe_auto_sync(Q1),
- slaves_to_start_on_failure(Q1, DeadGMPids);
- _ ->
- %% Master has changed, and we're not it.
- %% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- []
- end,
- {ok, QPid1, DeadPids, Extra}
+ DoNotPromote = SyncSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
+ case {{QPid, SPids}, {QPid1, SPids1}} of
+ {Same, Same} ->
+ {ok, QPid1, DeadPids, []};
+ _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
+ %% We have been promoted to master
+ %% but there are no synchronised mirrors
+ %% hence this node is not synchronised either
+ %% Bailing out.
+ {error, {not_synced, SPids1}};
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
+ %% Either master hasn't changed, so
+ %% we're ok to update mnesia; or we have
+ %% become the master. If gm altered,
+ %% we have no choice but to proceed.
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ %% If we add and remove nodes at the
+ %% same time we might tell the old
+ %% master we need to sync and then
+ %% shut it down. So let's check if
+ %% the new master needs to sync.
+ maybe_auto_sync(Q1),
+ {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
+ _ ->
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ {ok, QPid1, DeadPids, []}
+ end
end
end).
%% [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.
+stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
+ PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ %% It's possible that we could be partitioned from some slaves
+ %% between the lookup and the broadcast, in which case we could
+ %% monitor them but they would not have received the GM
+ %% message. So only wait for slaves which are still
+ %% not-partitioned.
+ PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
+ case rabbit_mnesia:on_running_node(Pid) of
+ true ->
+ receive
+ {'DOWN', MRef, process, _Pid, _Info} ->
+ Acc
+ after WaitTimeout ->
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Missing 'DOWN' message from ~p in"
+ " node ~p~n", [Pid, node(Pid)]),
+ [Pid | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end, [], PidsMRefs),
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { gm_pids = [], slave_pids = [],
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ slave_pids_pending_shutdown = PendingSlavePids})
+ end),
+ ok = gm:forget_group(QName).
+
%%----------------------------------------------------------------------------
promote_slave([SPid | SPids]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
- case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
- {none, none, none, none, none} ->
+ PromoteOnFailure = proplists:get_value(
+ <<"ha-promote-on-failure">>, KeyList, none),
+ case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
+ {none, none, none, none, none, none} ->
ok;
- {none, _, _, _, _} ->
+ {none, _, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
- {PromoteOnShutdown, fun validate_pos/1}])
+ {PromoteOnShutdown, fun validate_pos/1},
+ {PromoteOnFailure, fun validate_pof/1}])
end.
ha_params_validator(Mode) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
Mode -> {error, "ha-promote-on-shutdown must be "
"\"always\" or \"when-synced\", got ~p", [Mode]}
end.
+
+validate_pof(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-failure must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 65a13f03c0..ee8c7f9d26 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
end;
handle_call({gm_deaths, DeadGMPids}, From,
- State = #state { gm = GM, q = Q = #amqqueue {
- name = QName, pid = MPid }}) ->
+ State = #state{ gm = GM,
+ q = Q = #amqqueue{ name = QName, pid = MPid },
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
+ {error, {not_synced, SPids}} ->
+ WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
+ rabbit_mirror_queue_misc:stop_all_slaves(
+ {error, not_synced}, SPids, QName, GM, WaitTimeout),
+ BQ:delete_and_terminate({error, not_synced}, BQS),
+ {stop, normal, State#state{backing_queue_state = undefined}};
{ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),