summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmq.config.example2
-rwxr-xr-xscripts/rabbitmq-server-ha.ocf169
-rw-r--r--src/mochinum.erl358
-rw-r--r--src/rabbit_autoheal.erl113
-rw-r--r--src/rabbit_node_monitor.erl53
-rw-r--r--test/partitions_SUITE.erl23
6 files changed, 285 insertions, 433 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example
index f7477abe61..4d376d953a 100644
--- a/docs/rabbitmq.config.example
+++ b/docs/rabbitmq.config.example
@@ -161,7 +161,7 @@
%% Set the default AMQP heartbeat delay (in seconds).
%%
- %% {heartbeat, 600},
+ %% {heartbeat, 60},
%% Set the max permissible size of an AMQP frame (in bytes).
%%
diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf
index 0dd27c72c4..c74525c936 100755
--- a/scripts/rabbitmq-server-ha.ocf
+++ b/scripts/rabbitmq-server-ha.ocf
@@ -47,6 +47,8 @@ OCF_RESKEY_use_fqdn_default=false
OCF_RESKEY_fqdn_prefix_default=""
OCF_RESKEY_max_rabbitmqctl_timeouts_default=3
OCF_RESKEY_policy_file_default="/usr/local/sbin/set_rabbitmq_policy"
+OCF_RESKEY_rmq_feature_health_check_default=true
+OCF_RESKEY_rmq_feature_local_list_queues_default=true
: ${HA_LOGTAG="lrmd"}
: ${HA_LOGFACILITY="daemon"}
@@ -68,6 +70,8 @@ OCF_RESKEY_policy_file_default="/usr/local/sbin/set_rabbitmq_policy"
: ${OCF_RESKEY_fqdn_prefix=${OCF_RESKEY_fqdn_prefix_default}}
: ${OCF_RESKEY_max_rabbitmqctl_timeouts=${OCF_RESKEY_max_rabbitmqctl_timeouts_default}}
: ${OCF_RESKEY_policy_file=${OCF_RESKEY_policy_file_default}}
+: ${OCF_RESKEY_rmq_feature_health_check=${OCF_RESKEY_rmq_feature_health_check_default}}
+: ${OCF_RESKEY_rmq_feature_local_list_queues=${OCF_RESKEY_rmq_feature_local_list_queues_default}}
#######################################################################
@@ -298,6 +302,26 @@ A path to the shell script to setup RabbitMQ policies
<content type="string" default="${OCF_RESKEY_policy_file_default}" />
</parameter>
+<parameter name="rmq_feature_health_check" unique="0" required="0">
+<longdesc lang="en">
+Since rabbit 3.6.4 list_queues/list_channels-based monitoring should
+be replaced with "node_health_check" command, as it creates no network
+load at all.
+</longdesc>
+<shortdesc lang="en">Use node_health_check for monitoring</shortdesc>
+<content type="boolean" default="${OCF_RESKEY_rmq_feature_health_check_default}" />
+</parameter>
+
+<parameter name="rmq_feature_local_list_queues" unique="0" required="0">
+<longdesc lang="en">
+For rabbit version that implements --local flag for list_queues, this
+can greatly reduce network overhead in cases when node is
+stopped/demoted.
+</longdesc>
+<shortdesc lang="en">Use --local option for list_queues</shortdesc>
+<content type="boolean" default="${OCF_RESKEY_rmq_feature_local_list_queues_default}" />
+</parameter>
+
$EXTENDED_OCF_PARAMS
</parameters>
@@ -788,10 +812,45 @@ get_master_name_but()
done
}
+erl_eval() {
+ local fmt="${1:?}"
+ shift
+
+ ${OCF_RESKEY_ctl} eval "$(printf "$fmt" "$@")"
+}
+
# Returns 0 if we are clustered with provideded node
is_clustered_with()
{
- get_running_nodes | grep -q $(rabbit_node_name $1);
+ local LH="${LH}: is_clustered_with: "
+ local node_name
+ local rc
+ node_name=$(rabbit_node_name $1)
+
+ local seen_as_running
+ seen_as_running=$(erl_eval "lists:member('%s', rabbit_mnesia:cluster_nodes(running))." "$node_name")
+ rc=$?
+ if [ "$rc" -ne 0 ]; then
+ ocf_log err "${LH} Failed to check whether '$node_name' is considered running by us"
+ # XXX Or should we give remote node benefit of a doubt?
+ return 1
+ elif [ "$seen_as_running" != true ]; then
+ ocf_log info "${LH} Node $node_name is not running, considering it not clustered with us"
+ return 1
+ fi
+
+ local seen_as_partitioned
+ seen_as_partitioned=$(erl_eval "lists:member('%s', rabbit_node_monitor:partitions())." "$node_name")
+ rc=$?
+ if [ "$rc" -ne 0 ]; then
+ ocf_log err "${LH} Failed to check whether '$node_name' is partitioned with us"
+ # XXX Or should we give remote node benefit of a doubt?
+ return 1
+ elif [ "$seen_as_partitioned" != false ]; then
+ ocf_log info "${LH} Node $node_name is partitioned from us"
+ return 1
+ fi
+
return $?
}
@@ -1377,7 +1436,9 @@ check_timeouts() {
local timeouts_attr_name=$2
local op_name=$3
- if [ $op_rc -ne 124 -a $op_rc -ne 137 ]; then
+ # 75 is EX_TEMPFAIL from sysexits, and is used by rabbitmqctl to signal about
+ # timeout.
+ if [ $op_rc -ne 124 -a $op_rc -ne 137 -a $op_rc -ne 75 ]; then
ocf_update_private_attr $timeouts_attr_name 0
return 0
fi
@@ -1401,12 +1462,20 @@ check_timeouts() {
}
wait_sync() {
- wait_time=$1
+ local wait_time=$1
+ local queues
+ local opt_arg=""
- queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state"
- su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \
- do sleep 2; done\""
- return $?
+ if [ "$OCF_RESKEY_rmq_feature_local_list_queues" = "true" ]; then
+ opt_arg="--local"
+ fi
+
+ queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues $opt_arg name state"
+
+ su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \
+ do sleep 2; done\""
+
+ return $?
}
get_monitor() {
@@ -1516,7 +1585,75 @@ get_monitor() {
return $rc
fi
- # Check if the rabbitmqctl control plane is alive.
+ # rc can be SUCCESS or RUNNING_MASTER, don't touch it unless there
+ # is some error uncovered by node_health_check
+ if ! node_health_check; then
+ rc=$OCF_ERR_GENERIC
+ fi
+
+ # If we are the master and healthy, check that we see other cluster members
+ # Order a member to restart if we don't see it
+ if [ $rc -eq $OCF_RUNNING_MASTER ] ; then
+ for node in $(get_all_pacemaker_nodes); do
+ if ! is_clustered_with $node; then
+ nowtime=$(now)
+
+ ocf_log warn "${LH} node $node is not connected with us, ordering it to restart."
+ ocf_update_private_attr 'rabbit-ordered-to-restart' "$nowtime" "$node"
+ fi
+ done
+ fi
+
+ ocf_log info "${LH} get_monitor function ready to return ${rc}"
+ return $rc
+}
+
+# Check if the rabbitmqctl control plane is alive.
+node_health_check() {
+ local rc
+ if [ "$OCF_RESKEY_rmq_feature_health_check" = true ]; then
+ node_health_check_local
+ rc=$?
+ else
+ node_health_check_legacy
+ rc=$?
+ fi
+ return $rc
+}
+
+node_health_check_local() {
+ local LH="${LH} node_health_check_local():"
+ local rc
+ local rc_timeouts
+
+ # Give node_health_check some time to handle timeout by itself.
+ # By using internal rabbitmqctl timeouts, we allow it to print
+ # more useful diagnostics
+ local timeout=$((TIMEOUT_ARG - 2))
+ su_rabbit_cmd "${OCF_RESKEY_ctl} node_health_check -t $timeout"
+ rc=$?
+
+ check_timeouts $rc "rabbit_node_health_check_timeouts" "node_health_check"
+ rc_timeouts=$?
+
+ if [ "$rc_timeouts" -eq 2 ]; then
+ master_score 0
+ ocf_log info "${LH} node_health_check timed out, retry limit reached"
+ return $OCF_ERR_GENERIC
+ elif [ "$rc_timeouts" -eq 1 ]; then
+ ocf_log info "${LH} node_health_check timed out, going to retry"
+ return $OCF_SUCCESS
+ fi
+
+ if [ "$rc" -ne 0 ]; then
+ ocf_log err "${LH} rabbitmqctl node_health_check exited with errors."
+ return $OCF_ERR_GENERIC
+ else
+ return $OCF_SUCCESS
+ fi
+}
+
+node_health_check_legacy() {
local rc_alive
local timeout_alive
su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null"
@@ -1609,20 +1746,6 @@ get_monitor() {
fi
fi
- # If we are the master and healthy, check that we see other cluster members
- # Order a member to restart if we don't see it
- if [ $rc -eq $OCF_RUNNING_MASTER ] ; then
- for node in $(get_all_pacemaker_nodes); do
- if ! is_clustered_with $node; then
- nowtime=$(now)
-
- ocf_log warn "${LH} node $node is not connected with us, ordering it to restart."
- ocf_update_private_attr 'rabbit-ordered-to-restart' "$nowtime" "$node"
- fi
- done
- fi
-
- ocf_log info "${LH} get_monitor function ready to return ${rc}"
return $rc
}
@@ -1711,7 +1834,7 @@ action_start() {
return $OCF_SUCCESS
fi
- local attrs_to_zero="rabbit_list_channels_timeouts rabbit_get_alarms_timeouts rabbit_list_queues_timeouts rabbit_cluster_status_timeouts"
+ local attrs_to_zero="rabbit_list_channels_timeouts rabbit_get_alarms_timeouts rabbit_list_queues_timeouts rabbit_cluster_status_timeouts rabbit_node_health_check_timeouts"
local attr_name_to_reset
for attr_name_to_reset in $attrs_to_zero; do
ocf_update_private_attr $attr_name_to_reset 0
diff --git a/src/mochinum.erl b/src/mochinum.erl
deleted file mode 100644
index 4ea7a22acf..0000000000
--- a/src/mochinum.erl
+++ /dev/null
@@ -1,358 +0,0 @@
-%% This file is a copy of `mochijson2.erl' from mochiweb, revision
-%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
-%% `LICENSE-MIT-Mochi'.
-
-%% @copyright 2007 Mochi Media, Inc.
-%% @author Bob Ippolito <bob@mochimedia.com>
-
-%% @doc Useful numeric algorithms for floats that cover some deficiencies
-%% in the math module. More interesting is digits/1, which implements
-%% the algorithm from:
-%% http://www.cs.indiana.edu/~burger/fp/index.html
-%% See also "Printing Floating-Point Numbers Quickly and Accurately"
-%% in Proceedings of the SIGPLAN '96 Conference on Programming Language
-%% Design and Implementation.
-
--module(mochinum).
--author("Bob Ippolito <bob@mochimedia.com>").
--export([digits/1, frexp/1, int_pow/2, int_ceil/1]).
-
-%% IEEE 754 Float exponent bias
--define(FLOAT_BIAS, 1022).
--define(MIN_EXP, -1074).
--define(BIG_POW, 4503599627370496).
-
-%% External API
-
-%% @spec digits(number()) -> string()
-%% @doc Returns a string that accurately represents the given integer or float
-%% using a conservative amount of digits. Great for generating
-%% human-readable output, or compact ASCII serializations for floats.
-digits(N) when is_integer(N) ->
- integer_to_list(N);
-digits(0.0) ->
- "0.0";
-digits(Float) ->
- {Frac1, Exp1} = frexp_int(Float),
- [Place0 | Digits0] = digits1(Float, Exp1, Frac1),
- {Place, Digits} = transform_digits(Place0, Digits0),
- R = insert_decimal(Place, Digits),
- case Float < 0 of
- true ->
- [$- | R];
- _ ->
- R
- end.
-
-%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()}
-%% @doc Return the fractional and exponent part of an IEEE 754 double,
-%% equivalent to the libc function of the same name.
-%% F = Frac * pow(2, Exp).
-frexp(F) ->
- frexp1(unpack(F)).
-
-%% @spec int_pow(X::integer(), N::integer()) -> Y::integer()
-%% @doc Moderately efficient way to exponentiate integers.
-%% int_pow(10, 2) = 100.
-int_pow(_X, 0) ->
- 1;
-int_pow(X, N) when N > 0 ->
- int_pow(X, N, 1).
-
-%% @spec int_ceil(F::float()) -> integer()
-%% @doc Return the ceiling of F as an integer. The ceiling is defined as
-%% F when F == trunc(F);
-%% trunc(F) when F &lt; 0;
-%% trunc(F) + 1 when F &gt; 0.
-int_ceil(X) ->
- T = trunc(X),
- case (X - T) of
- Pos when Pos > 0 -> T + 1;
- _ -> T
- end.
-
-
-%% Internal API
-
-int_pow(X, N, R) when N < 2 ->
- R * X;
-int_pow(X, N, R) ->
- int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end).
-
-insert_decimal(0, S) ->
- "0." ++ S;
-insert_decimal(Place, S) when Place > 0 ->
- L = length(S),
- case Place - L of
- 0 ->
- S ++ ".0";
- N when N < 0 ->
- {S0, S1} = lists:split(L + N, S),
- S0 ++ "." ++ S1;
- N when N < 6 ->
- %% More places than digits
- S ++ lists:duplicate(N, $0) ++ ".0";
- _ ->
- insert_decimal_exp(Place, S)
- end;
-insert_decimal(Place, S) when Place > -6 ->
- "0." ++ lists:duplicate(abs(Place), $0) ++ S;
-insert_decimal(Place, S) ->
- insert_decimal_exp(Place, S).
-
-insert_decimal_exp(Place, S) ->
- [C | S0] = S,
- S1 = case S0 of
- [] ->
- "0";
- _ ->
- S0
- end,
- Exp = case Place < 0 of
- true ->
- "e-";
- false ->
- "e+"
- end,
- [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)).
-
-
-digits1(Float, Exp, Frac) ->
- Round = ((Frac band 1) =:= 0),
- case Exp >= 0 of
- true ->
- BExp = 1 bsl Exp,
- case (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * BExp * 2), 2, BExp, BExp,
- Round, Round, Float);
- false ->
- scale((Frac * BExp * 4), 4, (BExp * 2), BExp,
- Round, Round, Float)
- end;
- false ->
- case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * 2), 1 bsl (1 - Exp), 1, 1,
- Round, Round, Float);
- false ->
- scale((Frac * 4), 1 bsl (2 - Exp), 2, 1,
- Round, Round, Float)
- end
- end.
-
-scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) ->
- Est = int_ceil(math:log10(abs(Float)) - 1.0e-10),
- %% Note that the scheme implementation uses a 326 element look-up table
- %% for int_pow(10, N) where we do not.
- case Est >= 0 of
- true ->
- fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est,
- LowOk, HighOk);
- false ->
- Scale = int_pow(10, -Est),
- fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est,
- LowOk, HighOk)
- end.
-
-fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) ->
- TooLow = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TooLow of
- true ->
- [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)];
- false ->
- [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)]
- end.
-
-generate(R0, S, MPlus, MMinus, LowOk, HighOk) ->
- D = R0 div S,
- R = R0 rem S,
- TC1 = case LowOk of
- true ->
- R =< MMinus;
- false ->
- R < MMinus
- end,
- TC2 = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TC1 of
- false ->
- case TC2 of
- false ->
- [D | generate(R * 10, S, MPlus * 10, MMinus * 10,
- LowOk, HighOk)];
- true ->
- [D + 1]
- end;
- true ->
- case TC2 of
- false ->
- [D];
- true ->
- case R * 2 < S of
- true ->
- [D];
- false ->
- [D + 1]
- end
- end
- end.
-
-unpack(Float) ->
- <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>,
- {Sign, Exp, Frac}.
-
-frexp1({_Sign, 0, 0}) ->
- {0.0, 0};
-frexp1({Sign, 0, Frac}) ->
- Exp = log2floor(Frac),
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>,
- {Frac1, -(?FLOAT_BIAS) - 52 + Exp};
-frexp1({Sign, Exp, Frac}) ->
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>,
- {Frac1, Exp - ?FLOAT_BIAS}.
-
-log2floor(Int) ->
- log2floor(Int, 0).
-
-log2floor(0, N) ->
- N;
-log2floor(Int, N) ->
- log2floor(Int bsr 1, 1 + N).
-
-
-transform_digits(Place, [0 | Rest]) ->
- transform_digits(Place, Rest);
-transform_digits(Place, Digits) ->
- {Place, [$0 + D || D <- Digits]}.
-
-
-frexp_int(F) ->
- case unpack(F) of
- {_Sign, 0, Frac} ->
- {Frac, ?MIN_EXP};
- {_Sign, Exp, Frac} ->
- {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS}
- end.
-
-%%
-%% Tests
-%%
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-int_ceil_test() ->
- ?assertEqual(1, int_ceil(0.0001)),
- ?assertEqual(0, int_ceil(0.0)),
- ?assertEqual(1, int_ceil(0.99)),
- ?assertEqual(1, int_ceil(1.0)),
- ?assertEqual(-1, int_ceil(-1.5)),
- ?assertEqual(-2, int_ceil(-2.0)),
- ok.
-
-int_pow_test() ->
- ?assertEqual(1, int_pow(1, 1)),
- ?assertEqual(1, int_pow(1, 0)),
- ?assertEqual(1, int_pow(10, 0)),
- ?assertEqual(10, int_pow(10, 1)),
- ?assertEqual(100, int_pow(10, 2)),
- ?assertEqual(1000, int_pow(10, 3)),
- ok.
-
-digits_test() ->
- ?assertEqual("0",
- digits(0)),
- ?assertEqual("0.0",
- digits(0.0)),
- ?assertEqual("1.0",
- digits(1.0)),
- ?assertEqual("-1.0",
- digits(-1.0)),
- ?assertEqual("0.1",
- digits(0.1)),
- ?assertEqual("0.01",
- digits(0.01)),
- ?assertEqual("0.001",
- digits(0.001)),
- ?assertEqual("1.0e+6",
- digits(1000000.0)),
- ?assertEqual("0.5",
- digits(0.5)),
- ?assertEqual("4503599627370496.0",
- digits(4503599627370496.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324 =:= 5.0e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual("5.0e-324",
- digits(SmallDenorm)),
- ?assertEqual(SmallDenorm,
- list_to_float(digits(SmallDenorm))),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual("2.225073858507201e-308",
- digits(BigDenorm)),
- ?assertEqual(BigDenorm,
- list_to_float(digits(BigDenorm))),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual("2.2250738585072014e-308",
- digits(SmallNorm)),
- ?assertEqual(SmallNorm,
- list_to_float(digits(SmallNorm))),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual("1.7976931348623157e+308",
- digits(LargeNorm)),
- ?assertEqual(LargeNorm,
- list_to_float(digits(LargeNorm))),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual("5.0e-324",
- digits(math:pow(2, -1074))),
- ok.
-
-frexp_test() ->
- %% zero
- ?assertEqual({0.0, 0}, frexp(0.0)),
- %% one
- ?assertEqual({0.5, 1}, frexp(1.0)),
- %% negative one
- ?assertEqual({-0.5, 1}, frexp(-1.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual({0.5, -1073}, frexp(SmallDenorm)),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999978, -1022},
- frexp(BigDenorm)),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual({0.5, -1021}, frexp(SmallNorm)),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999989, 1024},
- frexp(LargeNorm)),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual(
- {0.5, -1073},
- frexp(math:pow(2, -1074))),
- ok.
-
--endif.
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index db4d41221e..9d6bfff5fc 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -180,6 +180,18 @@ node_down(_Node, not_healing) ->
node_down(Node, {winner_waiting, _, Notify}) ->
abort([Node], Notify);
+node_down(Node, {leader_waiting, Node, _Notify}) ->
+ %% The winner went down, we don't know what to do so we simply abort.
+ rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]),
+ not_healing;
+
+node_down(Node, {leader_waiting, _, _} = St) ->
+ %% If it is a partial partition, the winner might continue with the
+ %% healing process. If it is a full partition, the winner will also
+ %% see it and abort. Let's wait for it.
+ rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]),
+ St;
+
node_down(Node, _State) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
not_healing.
@@ -218,14 +230,24 @@ handle_msg({become_winner, Losers},
not_healing, _Partitions) ->
rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
[Losers]),
- %% The leader said everything was ready - do we agree? If not then
- %% give up.
- Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
- case Down of
- [] -> [send(L, {winner_is, node()}) || L <- Losers],
- {winner_waiting, Losers, Losers};
- _ -> abort(Down, Losers)
- end;
+ stop_partition(Losers);
+
+handle_msg({become_winner, Losers},
+ {winner_waiting, _, Losers}, _Partitions) ->
+ %% The leader has aborted the healing, might have seen us down but
+ %% we didn't see the same. Let's try again as it is the same partition.
+ rabbit_log:info("Autoheal: I am the winner and received a duplicated "
+ "request, waiting again for ~p to stop~n", [Losers]),
+ stop_partition(Losers);
+
+handle_msg({become_winner, _},
+ {winner_waiting, _, Losers}, _Partitions) ->
+ %% Something has happened to the leader, it might have seen us down but we
+ %% are still alive. Partitions have changed, cannot continue.
+ rabbit_log:info("Autoheal: I am the winner and received another healing "
+ "request, partitions have changed. Aborting ~n", [Losers]),
+ winner_finish(Losers),
+ not_healing;
handle_msg({winner_is, Winner}, State = not_healing,
_Partitions) ->
@@ -269,6 +291,14 @@ handle_msg({autoheal_finished, Winner}, not_healing, _Partitions)
%% We are the leader and the winner. The state already transitioned
%% to "not_healing" at the end of the autoheal process.
rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]),
+ not_healing;
+
+handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) ->
+ %% We might have seen the winner down during a partial partition and
+ %% transitioned to not_healing. However, the winner was still able
+ %% to finish. Let it pass.
+ rabbit_log:info("Autoheal finished according to winner ~p."
+ " Unexpected, I might have previously seen the winner down~n", [Winner]),
not_healing.
%%----------------------------------------------------------------------------
@@ -279,7 +309,9 @@ abort(Down, Notify) ->
rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]),
%% Make sure any nodes waiting for us start - it won't necessarily
%% heal the partition but at least they won't get stuck.
- winner_finish(Notify).
+ %% If we are executing this, we are not stopping. Thus, don't wait
+ %% for ourselves!
+ winner_finish(Notify -- [node()]).
winner_finish(Notify) ->
%% There is a race in Mnesia causing a starting loser to hang
@@ -297,32 +329,33 @@ winner_finish(Notify) ->
send(leader(), {autoheal_finished, node()}),
not_healing.
-%% XXX This can enter infinite loop, if mnesia was somehow restarted
-%% outside of our control - i.e. somebody started app back by hand or
-%% completely restarted node. One possible solution would be something
-%% like this (but it needs some more pondering and is left for some
-%% other patch):
-%% - monitor top-level mnesia supervisors of all losers
-%% - notify loosers about the fact that they are indeed loosers
-%% - wait for all monitors to go 'DOWN' (+ maybe some timeout on the whole process)
-%% - do one round of parallel rpc calls to check whether mnesia is still stoppend on all
-%% loosers
-%% - If everything is still stopped, continue autoheall process. Or cancel it otherwise.
-wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
- case rpc:call(Node, mnesia, system_info, [is_running]) of
- no ->
- wait_for_mnesia_shutdown(Rest);
- Running when
- Running =:= yes orelse
- Running =:= starting orelse
- Running =:= stopping ->
- timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
- wait_for_mnesia_shutdown(AllNodes);
- _ ->
- wait_for_mnesia_shutdown(Rest)
- end;
-wait_for_mnesia_shutdown([]) ->
- ok.
+%% This improves the previous implementation, but could still potentially enter an infinity
+%% loop. If it also possible that for when it finishes some of the nodes have been
+%% manually restarted, but we can't do much more (apart from stop them again). So let it
+%% continue and notify all the losers to restart.
+wait_for_mnesia_shutdown(AllNodes) ->
+ Monitors = lists:foldl(fun(Node, Monitors0) ->
+ pmon:monitor({mnesia_sup, Node}, Monitors0)
+ end, pmon:new(), AllNodes),
+ wait_for_supervisors(Monitors).
+
+wait_for_supervisors(Monitors) ->
+ case pmon:is_empty(Monitors) of
+ true ->
+ ok;
+ false ->
+ receive
+ {'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} ->
+ wait_for_supervisors(pmon:erase(I, Monitors))
+ after
+ 60000 ->
+ AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)],
+ rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending "
+ "winner notification again to these ~n", [AliveLosers]),
+ [send(L, {winner_is, node()}) || L <- AliveLosers],
+ wait_for_mnesia_shutdown(AliveLosers)
+ end
+ end.
restart_loser(State, Winner) ->
rabbit_log:warning(
@@ -402,3 +435,13 @@ fmt_error({remote_down, RemoteDown}) ->
rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]);
fmt_error({nodes_down, NodesDown}) ->
rabbit_misc:format("Local nodes down: ~p", [NodesDown]).
+
+stop_partition(Losers) ->
+ %% The leader said everything was ready - do we agree? If not then
+ %% give up.
+ Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
+ case Down of
+ [] -> [send(L, {winner_is, node()}) || L <- Losers],
+ {winner_waiting, Losers, Losers};
+ _ -> abort(Down, Losers)
+ end.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0322aacfd1..bea2a3fa96 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -336,7 +336,17 @@ init([]) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
- {ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
+ %% If the node has been restarted, Mnesia can trigger a system notification
+ %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
+ %% the inconsistent database event never arriving, we being monitoring all running
+ %% nodes as early as possible. The rest of the monitoring ops will only be triggered
+ %% when notifications arrive.
+ Nodes = possibly_partitioned_nodes(),
+ startup_log(Nodes),
+ Monitors = lists:foldl(fun(Node, Monitors0) ->
+ pmon:monitor({rabbit, Node}, Monitors0)
+ end, pmon:new(), Nodes),
+ {ok, ensure_keepalive_timer(#state{monitors = Monitors,
subscribers = pmon:new(),
partitions = [],
guid = rabbit_guid:gen(),
@@ -486,20 +496,22 @@ handle_cast({partial_partition_disconnect, Other}, State) ->
%% mnesia propagation.
handle_cast({node_up, Node, NodeType},
State = #state{monitors = Monitors}) ->
- case pmon:is_monitored({rabbit, Node}, Monitors) of
- true -> {noreply, State};
- false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({add_node(Node, AllNodes),
- case NodeType of
- disc -> add_node(Node, DiscNodes);
- ram -> DiscNodes
- end,
- add_node(Node, RunningNodes)}),
- ok = handle_live_rabbit(Node),
- Monitors1 = pmon:monitor({rabbit, Node}, Monitors),
- {noreply, maybe_autoheal(State#state{monitors = Monitors1})}
- end;
+ rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
+ end,
+ add_node(Node, RunningNodes)}),
+ ok = handle_live_rabbit(Node),
+ Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true ->
+ Monitors;
+ false ->
+ pmon:monitor({rabbit, Node}, Monitors)
+ end,
+ {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
@@ -572,7 +584,7 @@ handle_info({mnesia_system_event,
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
true -> State;
false -> State#state{
- monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
Partitions1 = lists:usort([Node | Partitions]),
@@ -873,3 +885,12 @@ alive_rabbit_nodes(Nodes) ->
ping_all() ->
[net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
ok.
+
+possibly_partitioned_nodes() ->
+ alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
+
+startup_log([]) ->
+ rabbit_log:info("Starting rabbit_node_monitor~n", []);
+startup_log(Nodes) ->
+ rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
+ [Nodes]).
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
index 1b901b5940..aa1c1df24f 100644
--- a/test/partitions_SUITE.erl
+++ b/test/partitions_SUITE.erl
@@ -45,6 +45,8 @@ groups() ->
{cluster_size_3, [], [
autoheal,
autoheal_after_pause_if_all_down,
+ autoheal_multiple_partial_partitions,
+ autoheal_unexpected_finish,
ignore,
pause_if_all_down_on_blocked,
pause_if_all_down_on_down,
@@ -307,6 +309,27 @@ do_autoheal(Config) ->
Test([{A, B}, {A, C}, {B, C}]),
ok.
+autoheal_multiple_partial_partitions(Config) ->
+ set_mode(Config, autoheal),
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ block_unblock([{A, B}]),
+ block_unblock([{A, C}]),
+ [await_listening(N, true) || N <- [A, B, C]],
+ [await_partitions(N, []) || N <- [A, B, C]],
+ ok.
+
+autoheal_unexpected_finish(Config) ->
+ set_mode(Config, autoheal),
+ [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
+ Pid ! {autoheal_msg, {autoheal_finished, B}},
+ Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]),
+ ok.
+
partial_false_positive(Config) ->
[A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
block([{A, B}]),