diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 97 |
2 files changed, 74 insertions, 27 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13cc925cd0..bc3ee2e0eb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1736,7 +1736,7 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx = none, confirmed = []}) -> State; send_confirms(State = #ch{tx = none, confirmed = C}) -> - case rabbit_node_monitor:pause_minority_guard() of + case rabbit_node_monitor:pause_partition_guard() of ok -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> @@ -1748,7 +1748,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) -> pausing -> State end; send_confirms(State) -> - case rabbit_node_monitor:pause_minority_guard() of + case rabbit_node_monitor:pause_partition_guard() of ok -> maybe_complete_tx(State); pausing -> State end. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 82a7a89be9..4af896f947 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -25,7 +25,7 @@ update_cluster_status/0, reset_cluster_status/0]). -export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]). -export([partitions/0, partitions/1, status/1, subscribe/1]). --export([pause_minority_guard/0]). +-export([pause_partition_guard/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, @@ -64,7 +64,7 @@ -spec(partitions/1 :: ([node()]) -> [{node(), [node()]}]). -spec(status/1 :: ([node()]) -> {[{node(), [node()]}], [node()]}). -spec(subscribe/1 :: (pid()) -> 'ok'). --spec(pause_minority_guard/0 :: () -> 'ok' | 'pausing'). +-spec(pause_partition_guard/0 :: () -> 'ok' | 'pausing'). -spec(all_rabbit_nodes_up/0 :: () -> boolean()). -spec(run_outside_applications/1 :: (fun (() -> any())) -> pid()). @@ -194,46 +194,66 @@ subscribe(Pid) -> gen_server:cast(?SERVER, {subscribe, Pid}). %%---------------------------------------------------------------------------- -%% pause_minority safety +%% pause_minority/keep_preferred safety %%---------------------------------------------------------------------------- %% If we are in a minority and pause_minority mode then a) we are %% going to shut down imminently and b) we should not confirm anything %% until then, since anything we confirm is likely to be lost. %% -%% We could confirm something by having an HA queue see the minority +%% The same principles apply to a node which isn't part of the preferred +%% partition when we are in keep_preferred mode. +%% +%% We could confirm something by having an HA queue see the pausing %% state (and fail over into it) before the node monitor stops us, or %% by using unmirrored queues and just having them vanish (and %% confiming messages as thrown away). %% %% So we have channels call in here before issuing confirms, to do a -%% lightweight check that we have not entered a minority state. +%% lightweight check that we have not entered a pausing state. -pause_minority_guard() -> - case get(pause_minority_guard) of - not_minority_mode -> +pause_partition_guard() -> + case get(pause_partition_guard) of + not_pause_mode -> ok; undefined -> {ok, M} = application:get_env(rabbit, cluster_partition_handling), case M of - pause_minority -> pause_minority_guard([]); - _ -> put(pause_minority_guard, not_minority_mode), - ok + pause_minority -> + pause_minority_guard([]); + {keep_preferred, PreferredNode} when is_atom(PreferredNode) -> + keep_preferred_guard(PreferredNode, []); + _ -> + put(pause_partition_guard, not_pause_mode), + ok end; {minority_mode, Nodes} -> - pause_minority_guard(Nodes) + pause_minority_guard(Nodes); + {keep_preferred_mode, PreferredNode, Nodes} -> + keep_preferred_guard(PreferredNode, Nodes) end. pause_minority_guard(LastNodes) -> case nodes() of LastNodes -> ok; - _ -> put(pause_minority_guard, {minority_mode, nodes()}), + _ -> put(pause_partition_guard, {minority_mode, nodes()}), case majority() of false -> pausing; true -> ok end end. +keep_preferred_guard(PreferredNode, LastNodes) -> + case nodes() of + LastNodes -> ok; + _ -> put(pause_partition_guard, + {keep_preferred_mode, PreferredNode, nodes()}), + case in_preferred_partition(PreferredNode) of + false -> pausing; + true -> ok + end + end. + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -289,8 +309,9 @@ handle_cast(notify_node_up, State = #state{guid = GUID}) -> %% 'check_partial_partition' to all the nodes it still thinks are %% alive. If any of those (intermediate) nodes still see the "down" %% node as up, they inform it that this has happened. The original -%% node (in 'ignore' or 'autoheal' mode) will then disconnect from the -%% intermediate node to "upgrade" to a full partition. +%% node (in 'ignore', 'keep_preferred' or 'autoheal' mode) will then +%% disconnect from the intermediate node to "upgrade" to a full +%% partition. %% %% In pause_minority mode it will instead immediately pause until all %% nodes come back. This is because the contract for pause_minority is @@ -525,10 +546,11 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> %% that we can respond in the same way to "rabbitmqctl stop_app" %% and "rabbitmqctl stop" as much as possible. %% - %% However, for pause_minority mode we can't do this, since we - %% depend on looking at whether other nodes are up to decide - %% whether to come back up ourselves - if we decide that based on - %% the rabbit application we would go down and never come back. + %% However, for pause_minority and keep_preferred modes we can't do + %% this, since we depend on looking at whether other nodes are up + %% to decide whether to come back up ourselves - if we decide that + %% based on the rabbit application we would go down and never come + %% back. case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of @@ -536,6 +558,21 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> false -> await_cluster_recovery(fun majority/0) end, State; + {ok, {keep_preferred, PreferredNode}} when is_atom(PreferredNode) -> + AllNodes = rabbit_mnesia:cluster_nodes(all), + case lists:member(PreferredNode, AllNodes) of + true -> + case in_preferred_partition(PreferredNode) of + true -> ok; + false -> await_cluster_recovery( + fun in_preferred_partition/0) + end; + false -> + rabbit_log:warning("cluster_partition_handling: preferred " + "node ~s not part of the cluster, " + "assuming 'ignore'~n", [PreferredNode]) + end, + State; {ok, ignore} -> State; {ok, autoheal} -> @@ -547,8 +584,8 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> end. await_cluster_recovery(Condition) -> - rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", - []), + rabbit_log:warning("Cluster minority/secondary status detected - " + "awaiting recovery~n", []), run_outside_applications(fun () -> rabbit:stop(), wait_for_cluster_recovery(Condition) @@ -681,15 +718,25 @@ disconnect(Node) -> %% here. "rabbit" in a function's name implies we test if the rabbit %% application is up, not just the node. -%% As we use these functions to decide what to do in pause_minority -%% state, they *must* be fast, even in the case where TCP connections -%% are timing out. So that means we should be careful about whether we -%% connect to nodes which are currently disconnected. +%% As we use these functions to decide what to do in pause_minority or +%% keep_preferred states, they *must* be fast, even in the case where +%% TCP connections are timing out. So that means we should be careful +%% about whether we connect to nodes which are currently disconnected. majority() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) / length(Nodes) > 0.5. +in_preferred_partition() -> + {ok, {keep_preferred, PreferredNode}} = + application:get_env(rabbit, cluster_partition_handling), + in_preferred_partition(PreferredNode). + +in_preferred_partition(PreferredNode) -> + Nodes = rabbit_mnesia:cluster_nodes(all), + lists:member(PreferredNode, Nodes) andalso + alive_nodes([PreferredNode]) =/= []. + all_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_nodes(Nodes)) =:= length(Nodes). |
