summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_node_monitor.erl97
2 files changed, 74 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 13cc925cd0..bc3ee2e0eb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1736,7 +1736,7 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx = none, confirmed = C}) ->
- case rabbit_node_monitor:pause_minority_guard() of
+ case rabbit_node_monitor:pause_partition_guard() of
ok -> MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
@@ -1748,7 +1748,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) ->
pausing -> State
end;
send_confirms(State) ->
- case rabbit_node_monitor:pause_minority_guard() of
+ case rabbit_node_monitor:pause_partition_guard() of
ok -> maybe_complete_tx(State);
pausing -> State
end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 82a7a89be9..4af896f947 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -25,7 +25,7 @@
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
-export([partitions/0, partitions/1, status/1, subscribe/1]).
--export([pause_minority_guard/0]).
+-export([pause_partition_guard/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
@@ -64,7 +64,7 @@
-spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]).
-spec(status/1 :: ([node()]) -> {[{node(), [node()]}], [node()]}).
-spec(subscribe/1 :: (pid()) -> 'ok').
--spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing').
+-spec(pause_partition_guard/0 :: () -> 'ok' | 'pausing').
-spec(all_rabbit_nodes_up/0 :: () -> boolean()).
-spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()).
@@ -194,46 +194,66 @@ subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).
%%----------------------------------------------------------------------------
-%% pause_minority safety
+%% pause_minority/keep_preferred safety
%%----------------------------------------------------------------------------
%% If we are in a minority and pause_minority mode then a) we are
%% going to shut down imminently and b) we should not confirm anything
%% until then, since anything we confirm is likely to be lost.
%%
-%% We could confirm something by having an HA queue see the minority
+%% The same principles apply to a node which isn't part of the preferred
+%% partition when we are in keep_preferred mode.
+%%
+%% We could confirm something by having an HA queue see the pausing
%% state (and fail over into it) before the node monitor stops us, or
%% by using unmirrored queues and just having them vanish (and
%% confiming messages as thrown away).
%%
%% So we have channels call in here before issuing confirms, to do a
-%% lightweight check that we have not entered a minority state.
+%% lightweight check that we have not entered a pausing state.
-pause_minority_guard() ->
- case get(pause_minority_guard) of
- not_minority_mode ->
+pause_partition_guard() ->
+ case get(pause_partition_guard) of
+ not_pause_mode ->
ok;
undefined ->
{ok, M} = application:get_env(rabbit, cluster_partition_handling),
case M of
- pause_minority -> pause_minority_guard([]);
- _ -> put(pause_minority_guard, not_minority_mode),
- ok
+ pause_minority ->
+ pause_minority_guard([]);
+ {keep_preferred, PreferredNode} when is_atom(PreferredNode) ->
+ keep_preferred_guard(PreferredNode, []);
+ _ ->
+ put(pause_partition_guard, not_pause_mode),
+ ok
end;
{minority_mode, Nodes} ->
- pause_minority_guard(Nodes)
+ pause_minority_guard(Nodes);
+ {keep_preferred_mode, PreferredNode, Nodes} ->
+ keep_preferred_guard(PreferredNode, Nodes)
end.
pause_minority_guard(LastNodes) ->
case nodes() of
LastNodes -> ok;
- _ -> put(pause_minority_guard, {minority_mode, nodes()}),
+ _ -> put(pause_partition_guard, {minority_mode, nodes()}),
case majority() of
false -> pausing;
true -> ok
end
end.
+keep_preferred_guard(PreferredNode, LastNodes) ->
+ case nodes() of
+ LastNodes -> ok;
+ _ -> put(pause_partition_guard,
+ {keep_preferred_mode, PreferredNode, nodes()}),
+ case in_preferred_partition(PreferredNode) of
+ false -> pausing;
+ true -> ok
+ end
+ end.
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -289,8 +309,9 @@ handle_cast(notify_node_up, State = #state{guid = GUID}) ->
%% 'check_partial_partition' to all the nodes it still thinks are
%% alive. If any of those (intermediate) nodes still see the "down"
%% node as up, they inform it that this has happened. The original
-%% node (in 'ignore' or 'autoheal' mode) will then disconnect from the
-%% intermediate node to "upgrade" to a full partition.
+%% node (in 'ignore', 'keep_preferred' or 'autoheal' mode) will then
+%% disconnect from the intermediate node to "upgrade" to a full
+%% partition.
%%
%% In pause_minority mode it will instead immediately pause until all
%% nodes come back. This is because the contract for pause_minority is
@@ -525,10 +546,11 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
%% that we can respond in the same way to "rabbitmqctl stop_app"
%% and "rabbitmqctl stop" as much as possible.
%%
- %% However, for pause_minority mode we can't do this, since we
- %% depend on looking at whether other nodes are up to decide
- %% whether to come back up ourselves - if we decide that based on
- %% the rabbit application we would go down and never come back.
+ %% However, for pause_minority and keep_preferred modes we can't do
+ %% this, since we depend on looking at whether other nodes are up
+ %% to decide whether to come back up ourselves - if we decide that
+ %% based on the rabbit application we would go down and never come
+ %% back.
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
case majority() of
@@ -536,6 +558,21 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
false -> await_cluster_recovery(fun majority/0)
end,
State;
+ {ok, {keep_preferred, PreferredNode}} when is_atom(PreferredNode) ->
+ AllNodes = rabbit_mnesia:cluster_nodes(all),
+ case lists:member(PreferredNode, AllNodes) of
+ true ->
+ case in_preferred_partition(PreferredNode) of
+ true -> ok;
+ false -> await_cluster_recovery(
+ fun in_preferred_partition/0)
+ end;
+ false ->
+ rabbit_log:warning("cluster_partition_handling: preferred "
+ "node ~s not part of the cluster, "
+ "assuming 'ignore'~n", [PreferredNode])
+ end,
+ State;
{ok, ignore} ->
State;
{ok, autoheal} ->
@@ -547,8 +584,8 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
end.
await_cluster_recovery(Condition) ->
- rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
- []),
+ rabbit_log:warning("Cluster minority/secondary status detected - "
+ "awaiting recovery~n", []),
run_outside_applications(fun () ->
rabbit:stop(),
wait_for_cluster_recovery(Condition)
@@ -681,15 +718,25 @@ disconnect(Node) ->
%% here. "rabbit" in a function's name implies we test if the rabbit
%% application is up, not just the node.
-%% As we use these functions to decide what to do in pause_minority
-%% state, they *must* be fast, even in the case where TCP connections
-%% are timing out. So that means we should be careful about whether we
-%% connect to nodes which are currently disconnected.
+%% As we use these functions to decide what to do in pause_minority or
+%% keep_preferred states, they *must* be fast, even in the case where
+%% TCP connections are timing out. So that means we should be careful
+%% about whether we connect to nodes which are currently disconnected.
majority() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+in_preferred_partition() ->
+ {ok, {keep_preferred, PreferredNode}} =
+ application:get_env(rabbit, cluster_partition_handling),
+ in_preferred_partition(PreferredNode).
+
+in_preferred_partition(PreferredNode) ->
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ lists:member(PreferredNode, Nodes) andalso
+ alive_nodes([PreferredNode]) =/= [].
+
all_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_nodes(Nodes)) =:= length(Nodes).