summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl40
-rw-r--r--src/rabbit_mirror_queue_misc.erl126
-rw-r--r--src/rabbit_mirror_queue_slave.erl12
-rw-r--r--test/dynamic_ha_SUITE.erl51
5 files changed, 160 insertions, 76 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 562f0f0fcf..c6e23bf21f 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -365,7 +365,12 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
{stop, shutdown, State};
{error, not_found} ->
- {stop, normal, State}
+ {stop, normal, State};
+ {error, {not_synced, _}} ->
+ rabbit_log:error("Mirror queue ~p in unexpected state."
+ " Promoted to master but already a master.",
+ [QueueName]),
+ error(unexpected_mirrored_state)
end;
handle_cast(request_depth, State = #state { depth_fun = DepthFun,
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8dd8a44c5b..a25b664bb0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -212,45 +212,7 @@ delete_and_terminate(Reason, State = #state { backing_queue = BQ,
stop_all_slaves(Reason, #state{name = QName, gm = GM, wait_timeout = WT}) ->
{ok, #amqqueue{slave_pids = SPids}} = rabbit_amqqueue:lookup(QName),
- PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
- ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
- %% It's possible that we could be partitioned from some slaves
- %% between the lookup and the broadcast, in which case we could
- %% monitor them but they would not have received the GM
- %% message. So only wait for slaves which are still
- %% not-partitioned.
- PendingSlavePids =
- lists:foldl(
- fun({Pid, MRef}, Acc) ->
- case rabbit_mnesia:on_running_node(Pid) of
- true ->
- receive
- {'DOWN', MRef, process, _Pid, _Info} ->
- Acc
- after WT ->
- rabbit_mirror_queue_misc:log_warning(
- QName, "Missing 'DOWN' message from ~p in"
- " node ~p~n", [Pid, node(Pid)]),
- [Pid | Acc]
- end;
- false ->
- Acc
- end
- end, [], PidsMRefs),
- %% Normally when we remove a slave another slave or master will
- %% notice and update Mnesia. But we just removed them all, and
- %% have stopped listening ourselves. So manually clean up.
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q] = mnesia:read({rabbit_queue, QName}),
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { gm_pids = [], slave_pids = [],
- %% Restarted slaves on running nodes can
- %% ensure old incarnations are stopped using
- %% the pending slave pids.
- slave_pids_pending_shutdown = PendingSlavePids})
- end),
- ok = gm:forget_group(QName).
+ rabbit_mirror_queue_misc:stop_all_slaves(Reason, SPids, QName, GM, WT).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 82169af7cb..16324abba0 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -24,6 +24,7 @@
update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
+-export([stop_all_slaves/5]).
-export([sync_queue/1, cancel_sync_queue/1]).
@@ -47,6 +48,8 @@
[policy_validator, <<"ha-sync-batch-size">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-promote-on-shutdown">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-promote-on-failure">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -85,6 +88,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
+ sync_slave_pids = SyncSPids,
gm_pids = GMPids }] ->
{DeadGM, AliveGM} = lists:partition(
fun ({GM, _}) ->
@@ -104,35 +108,41 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
{QPid, SPids};
_ -> promote_slave(Alive)
end,
- Extra =
- case {{QPid, SPids}, {QPid1, SPids1}} of
- {Same, Same} ->
- [];
- _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
- %% Either master hasn't changed, so
- %% we're ok to update mnesia; or we have
- %% become the master. If gm altered,
- %% we have no choice but to proceed.
- Q1 = Q#amqqueue{pid = QPid1,
- slave_pids = SPids1,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- %% If we add and remove nodes at the
- %% same time we might tell the old
- %% master we need to sync and then
- %% shut it down. So let's check if
- %% the new master needs to sync.
- maybe_auto_sync(Q1),
- slaves_to_start_on_failure(Q1, DeadGMPids);
- _ ->
- %% Master has changed, and we're not it.
- %% [1].
- Q1 = Q#amqqueue{slave_pids = Alive,
- gm_pids = AliveGM},
- store_updated_slaves(Q1),
- []
- end,
- {ok, QPid1, DeadPids, Extra}
+ DoNotPromote = SyncSPids =:= [] andalso
+ rabbit_policy:get(<<"ha-promote-on-failure">>, Q) =:= <<"when-synced">>,
+ case {{QPid, SPids}, {QPid1, SPids1}} of
+ {Same, Same} ->
+ {ok, QPid1, DeadPids, []};
+ _ when QPid1 =/= QPid andalso QPid1 =:= Self andalso DoNotPromote =:= true ->
+ %% We have been promoted to master
+ %% but there are no synchronised mirrors
+ %% hence this node is not synchronised either
+ %% Bailing out.
+ {error, {not_synced, SPids1}};
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
+ %% Either master hasn't changed, so
+ %% we're ok to update mnesia; or we have
+ %% become the master. If gm altered,
+ %% we have no choice but to proceed.
+ Q1 = Q#amqqueue{pid = QPid1,
+ slave_pids = SPids1,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ %% If we add and remove nodes at the
+ %% same time we might tell the old
+ %% master we need to sync and then
+ %% shut it down. So let's check if
+ %% the new master needs to sync.
+ maybe_auto_sync(Q1),
+ {ok, QPid1, DeadPids, slaves_to_start_on_failure(Q1, DeadGMPids)};
+ _ ->
+ %% Master has changed, and we're not it.
+ %% [1].
+ Q1 = Q#amqqueue{slave_pids = Alive,
+ gm_pids = AliveGM},
+ store_updated_slaves(Q1),
+ {ok, QPid1, DeadPids, []}
+ end
end
end).
%% [1] We still update mnesia here in case the slave that is supposed
@@ -305,6 +315,44 @@ update_recoverable(SPids, RS) ->
DelNodes = RunningNodes -- SNodes, %% i.e. running with no slave
(RS -- DelNodes) ++ AddNodes.
+stop_all_slaves(Reason, SPids, QName, GM, WaitTimeout) ->
+ PidsMRefs = [{Pid, erlang:monitor(process, Pid)} || Pid <- [GM | SPids]],
+ ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
+ %% It's possible that we could be partitioned from some slaves
+ %% between the lookup and the broadcast, in which case we could
+ %% monitor them but they would not have received the GM
+ %% message. So only wait for slaves which are still
+ %% not-partitioned.
+ PendingSlavePids = lists:foldl(fun({Pid, MRef}, Acc) ->
+ case rabbit_mnesia:on_running_node(Pid) of
+ true ->
+ receive
+ {'DOWN', MRef, process, _Pid, _Info} ->
+ Acc
+ after WaitTimeout ->
+ rabbit_mirror_queue_misc:log_warning(
+ QName, "Missing 'DOWN' message from ~p in"
+ " node ~p~n", [Pid, node(Pid)]),
+ [Pid | Acc]
+ end;
+ false ->
+ Acc
+ end
+ end, [], PidsMRefs),
+ %% Normally when we remove a slave another slave or master will
+ %% notice and update Mnesia. But we just removed them all, and
+ %% have stopped listening ourselves. So manually clean up.
+ rabbit_misc:execute_mnesia_transaction(fun () ->
+ [Q] = mnesia:read({rabbit_queue, QName}),
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q #amqqueue { gm_pids = [], slave_pids = [],
+ %% Restarted slaves on running nodes can
+ %% ensure old incarnations are stopped using
+ %% the pending slave pids.
+ slave_pids_pending_shutdown = PendingSlavePids})
+ end),
+ ok = gm:forget_group(QName).
+
%%----------------------------------------------------------------------------
promote_slave([SPid | SPids]) ->
@@ -478,10 +526,12 @@ validate_policy(KeyList) ->
<<"ha-sync-batch-size">>, KeyList, none),
PromoteOnShutdown = proplists:get_value(
<<"ha-promote-on-shutdown">>, KeyList, none),
- case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown} of
- {none, none, none, none, none} ->
+ PromoteOnFailure = proplists:get_value(
+ <<"ha-promote-on-failure">>, KeyList, none),
+ case {Mode, Params, SyncMode, SyncBatchSize, PromoteOnShutdown, PromoteOnFailure} of
+ {none, none, none, none, none, none} ->
ok;
- {none, _, _, _, _} ->
+ {none, _, _, _, _, _} ->
{error, "ha-mode must be specified to specify ha-params, "
"ha-sync-mode or ha-promote-on-shutdown", []};
_ ->
@@ -490,7 +540,8 @@ validate_policy(KeyList) ->
{Params, ha_params_validator(Mode)},
{SyncMode, fun validate_sync_mode/1},
{SyncBatchSize, fun validate_sync_batch_size/1},
- {PromoteOnShutdown, fun validate_pos/1}])
+ {PromoteOnShutdown, fun validate_pos/1},
+ {PromoteOnFailure, fun validate_pof/1}])
end.
ha_params_validator(Mode) ->
@@ -532,3 +583,12 @@ validate_pos(PromoteOnShutdown) ->
Mode -> {error, "ha-promote-on-shutdown must be "
"\"always\" or \"when-synced\", got ~p", [Mode]}
end.
+
+validate_pof(PromoteOnShutdown) ->
+ case PromoteOnShutdown of
+ <<"always">> -> ok;
+ <<"when-synced">> -> ok;
+ none -> ok;
+ Mode -> {error, "ha-promote-on-failure must be "
+ "\"always\" or \"when-synced\", got ~p", [Mode]}
+ end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 65a13f03c0..ee8c7f9d26 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -223,13 +223,21 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
end;
handle_call({gm_deaths, DeadGMPids}, From,
- State = #state { gm = GM, q = Q = #amqqueue {
- name = QName, pid = MPid }}) ->
+ State = #state{ gm = GM,
+ q = Q = #amqqueue{ name = QName, pid = MPid },
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
Self = self(),
case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
+ {error, {not_synced, SPids}} ->
+ WaitTimeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000),
+ rabbit_mirror_queue_misc:stop_all_slaves(
+ {error, not_synced}, SPids, QName, GM, WaitTimeout),
+ BQ:delete_and_terminate({error, not_synced}, BQS),
+ {stop, normal, State#state{backing_queue_state = undefined}};
{ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 7629d2264a..1d455bb290 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -59,6 +59,7 @@ groups() ->
vhost_deletion,
force_delete_if_no_master,
promote_on_shutdown,
+ promote_on_failure,
slave_recovers_after_vhost_failure,
slave_recovers_after_vhost_down_an_up,
master_migrates_on_vhost_down,
@@ -287,22 +288,61 @@ force_delete_if_no_master(Config) ->
amqp_channel:call(BCh3, #'queue.delete'{queue = <<"ha.nopromote.test2">>}),
ok.
+promote_on_failure(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
+ <<"all">>, [{<<"ha-promote-on-failure">>, <<"always">>}]),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
+ <<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>}]),
+
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ [begin
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q,
+ durable = true}),
+ rabbit_ct_client_helpers:publish(ACh, Q, 10)
+ end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:kill_node(Config, A),
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.declare_ok'{message_count = 0} =
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
+ durable = true}),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
+ durable = true})),
+ ok = rabbit_ct_broker_helpers:start_node(Config, A),
+ ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
+ #'queue.declare_ok'{message_count = 10} =
+ amqp_channel:call(
+ ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
+ durable = true}),
+ ok.
+
promote_on_shutdown(Config) ->
[A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
<<"all">>, [{<<"ha-promote-on-shutdown">>, <<"always">>}]),
rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
<<"all">>),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromoteonfailure">>,
+ <<"all">>, [{<<"ha-promote-on-failure">>, <<"when-synced">>},
+ {<<"ha-promote-on-shutdown">>, <<"always">>}]),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
[begin
amqp_channel:call(ACh, #'queue.declare'{queue = Q,
durable = true}),
rabbit_ct_client_helpers:publish(ACh, Q, 10)
- end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
+ end || Q <- [<<"ha.promote.test">>,
+ <<"ha.nopromote.test">>,
+ <<"ha.nopromoteonfailure.test">>]],
ok = rabbit_ct_broker_helpers:restart_node(Config, B),
ok = rabbit_ct_broker_helpers:stop_node(Config, A),
BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ BCh1 = rabbit_ct_client_helpers:open_channel(Config, B),
#'queue.declare_ok'{message_count = 0} =
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
@@ -312,12 +352,21 @@ promote_on_shutdown(Config) ->
amqp_channel:call(
BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh1, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
+ durable = true})),
ok = rabbit_ct_broker_helpers:start_node(Config, A),
ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
#'queue.declare_ok'{message_count = 10} =
amqp_channel:call(
ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
durable = true}),
+ #'queue.declare_ok'{message_count = 10} =
+ amqp_channel:call(
+ ACh2, #'queue.declare'{queue = <<"ha.nopromoteonfailure.test">>,
+ durable = true}),
ok.
nodes_policy_should_pick_master_from_its_params(Config) ->