diff options
| -rw-r--r-- | docs/rabbitmq.config.example | 2 | ||||
| -rwxr-xr-x | scripts/rabbitmq-server-ha.ocf | 169 | ||||
| -rw-r--r-- | src/mochinum.erl | 358 | ||||
| -rw-r--r-- | src/rabbit_autoheal.erl | 113 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 53 | ||||
| -rw-r--r-- | test/partitions_SUITE.erl | 23 |
6 files changed, 285 insertions, 433 deletions
diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index f7477abe61..4d376d953a 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -161,7 +161,7 @@ %% Set the default AMQP heartbeat delay (in seconds). %% - %% {heartbeat, 600}, + %% {heartbeat, 60}, %% Set the max permissible size of an AMQP frame (in bytes). %% diff --git a/scripts/rabbitmq-server-ha.ocf b/scripts/rabbitmq-server-ha.ocf index 0dd27c72c4..c74525c936 100755 --- a/scripts/rabbitmq-server-ha.ocf +++ b/scripts/rabbitmq-server-ha.ocf @@ -47,6 +47,8 @@ OCF_RESKEY_use_fqdn_default=false OCF_RESKEY_fqdn_prefix_default="" OCF_RESKEY_max_rabbitmqctl_timeouts_default=3 OCF_RESKEY_policy_file_default="/usr/local/sbin/set_rabbitmq_policy" +OCF_RESKEY_rmq_feature_health_check_default=true +OCF_RESKEY_rmq_feature_local_list_queues_default=true : ${HA_LOGTAG="lrmd"} : ${HA_LOGFACILITY="daemon"} @@ -68,6 +70,8 @@ OCF_RESKEY_policy_file_default="/usr/local/sbin/set_rabbitmq_policy" : ${OCF_RESKEY_fqdn_prefix=${OCF_RESKEY_fqdn_prefix_default}} : ${OCF_RESKEY_max_rabbitmqctl_timeouts=${OCF_RESKEY_max_rabbitmqctl_timeouts_default}} : ${OCF_RESKEY_policy_file=${OCF_RESKEY_policy_file_default}} +: ${OCF_RESKEY_rmq_feature_health_check=${OCF_RESKEY_rmq_feature_health_check_default}} +: ${OCF_RESKEY_rmq_feature_local_list_queues=${OCF_RESKEY_rmq_feature_local_list_queues_default}} ####################################################################### @@ -298,6 +302,26 @@ A path to the shell script to setup RabbitMQ policies <content type="string" default="${OCF_RESKEY_policy_file_default}" /> </parameter> +<parameter name="rmq_feature_health_check" unique="0" required="0"> +<longdesc lang="en"> +Since rabbit 3.6.4 list_queues/list_channels-based monitoring should +be replaced with "node_health_check" command, as it creates no network +load at all. +</longdesc> +<shortdesc lang="en">Use node_health_check for monitoring</shortdesc> +<content type="boolean" default="${OCF_RESKEY_rmq_feature_health_check_default}" /> +</parameter> + +<parameter name="rmq_feature_local_list_queues" unique="0" required="0"> +<longdesc lang="en"> +For rabbit version that implements --local flag for list_queues, this +can greatly reduce network overhead in cases when node is +stopped/demoted. +</longdesc> +<shortdesc lang="en">Use --local option for list_queues</shortdesc> +<content type="boolean" default="${OCF_RESKEY_rmq_feature_local_list_queues_default}" /> +</parameter> + $EXTENDED_OCF_PARAMS </parameters> @@ -788,10 +812,45 @@ get_master_name_but() done } +erl_eval() { + local fmt="${1:?}" + shift + + ${OCF_RESKEY_ctl} eval "$(printf "$fmt" "$@")" +} + # Returns 0 if we are clustered with provideded node is_clustered_with() { - get_running_nodes | grep -q $(rabbit_node_name $1); + local LH="${LH}: is_clustered_with: " + local node_name + local rc + node_name=$(rabbit_node_name $1) + + local seen_as_running + seen_as_running=$(erl_eval "lists:member('%s', rabbit_mnesia:cluster_nodes(running))." "$node_name") + rc=$? + if [ "$rc" -ne 0 ]; then + ocf_log err "${LH} Failed to check whether '$node_name' is considered running by us" + # XXX Or should we give remote node benefit of a doubt? + return 1 + elif [ "$seen_as_running" != true ]; then + ocf_log info "${LH} Node $node_name is not running, considering it not clustered with us" + return 1 + fi + + local seen_as_partitioned + seen_as_partitioned=$(erl_eval "lists:member('%s', rabbit_node_monitor:partitions())." "$node_name") + rc=$? + if [ "$rc" -ne 0 ]; then + ocf_log err "${LH} Failed to check whether '$node_name' is partitioned with us" + # XXX Or should we give remote node benefit of a doubt? + return 1 + elif [ "$seen_as_partitioned" != false ]; then + ocf_log info "${LH} Node $node_name is partitioned from us" + return 1 + fi + return $? } @@ -1377,7 +1436,9 @@ check_timeouts() { local timeouts_attr_name=$2 local op_name=$3 - if [ $op_rc -ne 124 -a $op_rc -ne 137 ]; then + # 75 is EX_TEMPFAIL from sysexits, and is used by rabbitmqctl to signal about + # timeout. + if [ $op_rc -ne 124 -a $op_rc -ne 137 -a $op_rc -ne 75 ]; then ocf_update_private_attr $timeouts_attr_name 0 return 0 fi @@ -1401,12 +1462,20 @@ check_timeouts() { } wait_sync() { - wait_time=$1 + local wait_time=$1 + local queues + local opt_arg="" - queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues name state" - su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \ - do sleep 2; done\"" - return $? + if [ "$OCF_RESKEY_rmq_feature_local_list_queues" = "true" ]; then + opt_arg="--local" + fi + + queues="${COMMAND_TIMEOUT} ${OCF_RESKEY_ctl} list_queues $opt_arg name state" + + su_rabbit_cmd -t "${wait_time}" "sh -c \"while ${queues} | grep -q 'syncing,'; \ + do sleep 2; done\"" + + return $? } get_monitor() { @@ -1516,7 +1585,75 @@ get_monitor() { return $rc fi - # Check if the rabbitmqctl control plane is alive. + # rc can be SUCCESS or RUNNING_MASTER, don't touch it unless there + # is some error uncovered by node_health_check + if ! node_health_check; then + rc=$OCF_ERR_GENERIC + fi + + # If we are the master and healthy, check that we see other cluster members + # Order a member to restart if we don't see it + if [ $rc -eq $OCF_RUNNING_MASTER ] ; then + for node in $(get_all_pacemaker_nodes); do + if ! is_clustered_with $node; then + nowtime=$(now) + + ocf_log warn "${LH} node $node is not connected with us, ordering it to restart." + ocf_update_private_attr 'rabbit-ordered-to-restart' "$nowtime" "$node" + fi + done + fi + + ocf_log info "${LH} get_monitor function ready to return ${rc}" + return $rc +} + +# Check if the rabbitmqctl control plane is alive. +node_health_check() { + local rc + if [ "$OCF_RESKEY_rmq_feature_health_check" = true ]; then + node_health_check_local + rc=$? + else + node_health_check_legacy + rc=$? + fi + return $rc +} + +node_health_check_local() { + local LH="${LH} node_health_check_local():" + local rc + local rc_timeouts + + # Give node_health_check some time to handle timeout by itself. + # By using internal rabbitmqctl timeouts, we allow it to print + # more useful diagnostics + local timeout=$((TIMEOUT_ARG - 2)) + su_rabbit_cmd "${OCF_RESKEY_ctl} node_health_check -t $timeout" + rc=$? + + check_timeouts $rc "rabbit_node_health_check_timeouts" "node_health_check" + rc_timeouts=$? + + if [ "$rc_timeouts" -eq 2 ]; then + master_score 0 + ocf_log info "${LH} node_health_check timed out, retry limit reached" + return $OCF_ERR_GENERIC + elif [ "$rc_timeouts" -eq 1 ]; then + ocf_log info "${LH} node_health_check timed out, going to retry" + return $OCF_SUCCESS + fi + + if [ "$rc" -ne 0 ]; then + ocf_log err "${LH} rabbitmqctl node_health_check exited with errors." + return $OCF_ERR_GENERIC + else + return $OCF_SUCCESS + fi +} + +node_health_check_legacy() { local rc_alive local timeout_alive su_rabbit_cmd "${OCF_RESKEY_ctl} list_channels 2>&1 > /dev/null" @@ -1609,20 +1746,6 @@ get_monitor() { fi fi - # If we are the master and healthy, check that we see other cluster members - # Order a member to restart if we don't see it - if [ $rc -eq $OCF_RUNNING_MASTER ] ; then - for node in $(get_all_pacemaker_nodes); do - if ! is_clustered_with $node; then - nowtime=$(now) - - ocf_log warn "${LH} node $node is not connected with us, ordering it to restart." - ocf_update_private_attr 'rabbit-ordered-to-restart' "$nowtime" "$node" - fi - done - fi - - ocf_log info "${LH} get_monitor function ready to return ${rc}" return $rc } @@ -1711,7 +1834,7 @@ action_start() { return $OCF_SUCCESS fi - local attrs_to_zero="rabbit_list_channels_timeouts rabbit_get_alarms_timeouts rabbit_list_queues_timeouts rabbit_cluster_status_timeouts" + local attrs_to_zero="rabbit_list_channels_timeouts rabbit_get_alarms_timeouts rabbit_list_queues_timeouts rabbit_cluster_status_timeouts rabbit_node_health_check_timeouts" local attr_name_to_reset for attr_name_to_reset in $attrs_to_zero; do ocf_update_private_attr $attr_name_to_reset 0 diff --git a/src/mochinum.erl b/src/mochinum.erl deleted file mode 100644 index 4ea7a22acf..0000000000 --- a/src/mochinum.erl +++ /dev/null @@ -1,358 +0,0 @@ -%% This file is a copy of `mochijson2.erl' from mochiweb, revision -%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see -%% `LICENSE-MIT-Mochi'. - -%% @copyright 2007 Mochi Media, Inc. -%% @author Bob Ippolito <bob@mochimedia.com> - -%% @doc Useful numeric algorithms for floats that cover some deficiencies -%% in the math module. More interesting is digits/1, which implements -%% the algorithm from: -%% http://www.cs.indiana.edu/~burger/fp/index.html -%% See also "Printing Floating-Point Numbers Quickly and Accurately" -%% in Proceedings of the SIGPLAN '96 Conference on Programming Language -%% Design and Implementation. - --module(mochinum). --author("Bob Ippolito <bob@mochimedia.com>"). --export([digits/1, frexp/1, int_pow/2, int_ceil/1]). - -%% IEEE 754 Float exponent bias --define(FLOAT_BIAS, 1022). --define(MIN_EXP, -1074). --define(BIG_POW, 4503599627370496). - -%% External API - -%% @spec digits(number()) -> string() -%% @doc Returns a string that accurately represents the given integer or float -%% using a conservative amount of digits. Great for generating -%% human-readable output, or compact ASCII serializations for floats. -digits(N) when is_integer(N) -> - integer_to_list(N); -digits(0.0) -> - "0.0"; -digits(Float) -> - {Frac1, Exp1} = frexp_int(Float), - [Place0 | Digits0] = digits1(Float, Exp1, Frac1), - {Place, Digits} = transform_digits(Place0, Digits0), - R = insert_decimal(Place, Digits), - case Float < 0 of - true -> - [$- | R]; - _ -> - R - end. - -%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()} -%% @doc Return the fractional and exponent part of an IEEE 754 double, -%% equivalent to the libc function of the same name. -%% F = Frac * pow(2, Exp). -frexp(F) -> - frexp1(unpack(F)). - -%% @spec int_pow(X::integer(), N::integer()) -> Y::integer() -%% @doc Moderately efficient way to exponentiate integers. -%% int_pow(10, 2) = 100. -int_pow(_X, 0) -> - 1; -int_pow(X, N) when N > 0 -> - int_pow(X, N, 1). - -%% @spec int_ceil(F::float()) -> integer() -%% @doc Return the ceiling of F as an integer. The ceiling is defined as -%% F when F == trunc(F); -%% trunc(F) when F < 0; -%% trunc(F) + 1 when F > 0. -int_ceil(X) -> - T = trunc(X), - case (X - T) of - Pos when Pos > 0 -> T + 1; - _ -> T - end. - - -%% Internal API - -int_pow(X, N, R) when N < 2 -> - R * X; -int_pow(X, N, R) -> - int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end). - -insert_decimal(0, S) -> - "0." ++ S; -insert_decimal(Place, S) when Place > 0 -> - L = length(S), - case Place - L of - 0 -> - S ++ ".0"; - N when N < 0 -> - {S0, S1} = lists:split(L + N, S), - S0 ++ "." ++ S1; - N when N < 6 -> - %% More places than digits - S ++ lists:duplicate(N, $0) ++ ".0"; - _ -> - insert_decimal_exp(Place, S) - end; -insert_decimal(Place, S) when Place > -6 -> - "0." ++ lists:duplicate(abs(Place), $0) ++ S; -insert_decimal(Place, S) -> - insert_decimal_exp(Place, S). - -insert_decimal_exp(Place, S) -> - [C | S0] = S, - S1 = case S0 of - [] -> - "0"; - _ -> - S0 - end, - Exp = case Place < 0 of - true -> - "e-"; - false -> - "e+" - end, - [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)). - - -digits1(Float, Exp, Frac) -> - Round = ((Frac band 1) =:= 0), - case Exp >= 0 of - true -> - BExp = 1 bsl Exp, - case (Frac =/= ?BIG_POW) of - true -> - scale((Frac * BExp * 2), 2, BExp, BExp, - Round, Round, Float); - false -> - scale((Frac * BExp * 4), 4, (BExp * 2), BExp, - Round, Round, Float) - end; - false -> - case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of - true -> - scale((Frac * 2), 1 bsl (1 - Exp), 1, 1, - Round, Round, Float); - false -> - scale((Frac * 4), 1 bsl (2 - Exp), 2, 1, - Round, Round, Float) - end - end. - -scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) -> - Est = int_ceil(math:log10(abs(Float)) - 1.0e-10), - %% Note that the scheme implementation uses a 326 element look-up table - %% for int_pow(10, N) where we do not. - case Est >= 0 of - true -> - fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est, - LowOk, HighOk); - false -> - Scale = int_pow(10, -Est), - fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est, - LowOk, HighOk) - end. - -fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) -> - TooLow = case HighOk of - true -> - (R + MPlus) >= S; - false -> - (R + MPlus) > S - end, - case TooLow of - true -> - [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)]; - false -> - [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)] - end. - -generate(R0, S, MPlus, MMinus, LowOk, HighOk) -> - D = R0 div S, - R = R0 rem S, - TC1 = case LowOk of - true -> - R =< MMinus; - false -> - R < MMinus - end, - TC2 = case HighOk of - true -> - (R + MPlus) >= S; - false -> - (R + MPlus) > S - end, - case TC1 of - false -> - case TC2 of - false -> - [D | generate(R * 10, S, MPlus * 10, MMinus * 10, - LowOk, HighOk)]; - true -> - [D + 1] - end; - true -> - case TC2 of - false -> - [D]; - true -> - case R * 2 < S of - true -> - [D]; - false -> - [D + 1] - end - end - end. - -unpack(Float) -> - <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>, - {Sign, Exp, Frac}. - -frexp1({_Sign, 0, 0}) -> - {0.0, 0}; -frexp1({Sign, 0, Frac}) -> - Exp = log2floor(Frac), - <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>, - {Frac1, -(?FLOAT_BIAS) - 52 + Exp}; -frexp1({Sign, Exp, Frac}) -> - <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>, - {Frac1, Exp - ?FLOAT_BIAS}. - -log2floor(Int) -> - log2floor(Int, 0). - -log2floor(0, N) -> - N; -log2floor(Int, N) -> - log2floor(Int bsr 1, 1 + N). - - -transform_digits(Place, [0 | Rest]) -> - transform_digits(Place, Rest); -transform_digits(Place, Digits) -> - {Place, [$0 + D || D <- Digits]}. - - -frexp_int(F) -> - case unpack(F) of - {_Sign, 0, Frac} -> - {Frac, ?MIN_EXP}; - {_Sign, Exp, Frac} -> - {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS} - end. - -%% -%% Tests -%% --ifdef(TEST). --include_lib("eunit/include/eunit.hrl"). - -int_ceil_test() -> - ?assertEqual(1, int_ceil(0.0001)), - ?assertEqual(0, int_ceil(0.0)), - ?assertEqual(1, int_ceil(0.99)), - ?assertEqual(1, int_ceil(1.0)), - ?assertEqual(-1, int_ceil(-1.5)), - ?assertEqual(-2, int_ceil(-2.0)), - ok. - -int_pow_test() -> - ?assertEqual(1, int_pow(1, 1)), - ?assertEqual(1, int_pow(1, 0)), - ?assertEqual(1, int_pow(10, 0)), - ?assertEqual(10, int_pow(10, 1)), - ?assertEqual(100, int_pow(10, 2)), - ?assertEqual(1000, int_pow(10, 3)), - ok. - -digits_test() -> - ?assertEqual("0", - digits(0)), - ?assertEqual("0.0", - digits(0.0)), - ?assertEqual("1.0", - digits(1.0)), - ?assertEqual("-1.0", - digits(-1.0)), - ?assertEqual("0.1", - digits(0.1)), - ?assertEqual("0.01", - digits(0.01)), - ?assertEqual("0.001", - digits(0.001)), - ?assertEqual("1.0e+6", - digits(1000000.0)), - ?assertEqual("0.5", - digits(0.5)), - ?assertEqual("4503599627370496.0", - digits(4503599627370496.0)), - %% small denormalized number - %% 4.94065645841246544177e-324 =:= 5.0e-324 - <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>, - ?assertEqual("5.0e-324", - digits(SmallDenorm)), - ?assertEqual(SmallDenorm, - list_to_float(digits(SmallDenorm))), - %% large denormalized number - %% 2.22507385850720088902e-308 - <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>, - ?assertEqual("2.225073858507201e-308", - digits(BigDenorm)), - ?assertEqual(BigDenorm, - list_to_float(digits(BigDenorm))), - %% small normalized number - %% 2.22507385850720138309e-308 - <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>, - ?assertEqual("2.2250738585072014e-308", - digits(SmallNorm)), - ?assertEqual(SmallNorm, - list_to_float(digits(SmallNorm))), - %% large normalized number - %% 1.79769313486231570815e+308 - <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>, - ?assertEqual("1.7976931348623157e+308", - digits(LargeNorm)), - ?assertEqual(LargeNorm, - list_to_float(digits(LargeNorm))), - %% issue #10 - mochinum:frexp(math:pow(2, -1074)). - ?assertEqual("5.0e-324", - digits(math:pow(2, -1074))), - ok. - -frexp_test() -> - %% zero - ?assertEqual({0.0, 0}, frexp(0.0)), - %% one - ?assertEqual({0.5, 1}, frexp(1.0)), - %% negative one - ?assertEqual({-0.5, 1}, frexp(-1.0)), - %% small denormalized number - %% 4.94065645841246544177e-324 - <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>, - ?assertEqual({0.5, -1073}, frexp(SmallDenorm)), - %% large denormalized number - %% 2.22507385850720088902e-308 - <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>, - ?assertEqual( - {0.99999999999999978, -1022}, - frexp(BigDenorm)), - %% small normalized number - %% 2.22507385850720138309e-308 - <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>, - ?assertEqual({0.5, -1021}, frexp(SmallNorm)), - %% large normalized number - %% 1.79769313486231570815e+308 - <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>, - ?assertEqual( - {0.99999999999999989, 1024}, - frexp(LargeNorm)), - %% issue #10 - mochinum:frexp(math:pow(2, -1074)). - ?assertEqual( - {0.5, -1073}, - frexp(math:pow(2, -1074))), - ok. - --endif. diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index db4d41221e..9d6bfff5fc 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -180,6 +180,18 @@ node_down(_Node, not_healing) -> node_down(Node, {winner_waiting, _, Notify}) -> abort([Node], Notify); +node_down(Node, {leader_waiting, Node, _Notify}) -> + %% The winner went down, we don't know what to do so we simply abort. + rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]), + not_healing; + +node_down(Node, {leader_waiting, _, _} = St) -> + %% If it is a partial partition, the winner might continue with the + %% healing process. If it is a full partition, the winner will also + %% see it and abort. Let's wait for it. + rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]), + St; + node_down(Node, _State) -> rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]), not_healing. @@ -218,14 +230,24 @@ handle_msg({become_winner, Losers}, not_healing, _Partitions) -> rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n", [Losers]), - %% The leader said everything was ready - do we agree? If not then - %% give up. - Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers), - case Down of - [] -> [send(L, {winner_is, node()}) || L <- Losers], - {winner_waiting, Losers, Losers}; - _ -> abort(Down, Losers) - end; + stop_partition(Losers); + +handle_msg({become_winner, Losers}, + {winner_waiting, _, Losers}, _Partitions) -> + %% The leader has aborted the healing, might have seen us down but + %% we didn't see the same. Let's try again as it is the same partition. + rabbit_log:info("Autoheal: I am the winner and received a duplicated " + "request, waiting again for ~p to stop~n", [Losers]), + stop_partition(Losers); + +handle_msg({become_winner, _}, + {winner_waiting, _, Losers}, _Partitions) -> + %% Something has happened to the leader, it might have seen us down but we + %% are still alive. Partitions have changed, cannot continue. + rabbit_log:info("Autoheal: I am the winner and received another healing " + "request, partitions have changed. Aborting ~n", [Losers]), + winner_finish(Losers), + not_healing; handle_msg({winner_is, Winner}, State = not_healing, _Partitions) -> @@ -269,6 +291,14 @@ handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) %% We are the leader and the winner. The state already transitioned %% to "not_healing" at the end of the autoheal process. rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]), + not_healing; + +handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) -> + %% We might have seen the winner down during a partial partition and + %% transitioned to not_healing. However, the winner was still able + %% to finish. Let it pass. + rabbit_log:info("Autoheal finished according to winner ~p." + " Unexpected, I might have previously seen the winner down~n", [Winner]), not_healing. %%---------------------------------------------------------------------------- @@ -279,7 +309,9 @@ abort(Down, Notify) -> rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]), %% Make sure any nodes waiting for us start - it won't necessarily %% heal the partition but at least they won't get stuck. - winner_finish(Notify). + %% If we are executing this, we are not stopping. Thus, don't wait + %% for ourselves! + winner_finish(Notify -- [node()]). winner_finish(Notify) -> %% There is a race in Mnesia causing a starting loser to hang @@ -297,32 +329,33 @@ winner_finish(Notify) -> send(leader(), {autoheal_finished, node()}), not_healing. -%% XXX This can enter infinite loop, if mnesia was somehow restarted -%% outside of our control - i.e. somebody started app back by hand or -%% completely restarted node. One possible solution would be something -%% like this (but it needs some more pondering and is left for some -%% other patch): -%% - monitor top-level mnesia supervisors of all losers -%% - notify loosers about the fact that they are indeed loosers -%% - wait for all monitors to go 'DOWN' (+ maybe some timeout on the whole process) -%% - do one round of parallel rpc calls to check whether mnesia is still stoppend on all -%% loosers -%% - If everything is still stopped, continue autoheall process. Or cancel it otherwise. -wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> - case rpc:call(Node, mnesia, system_info, [is_running]) of - no -> - wait_for_mnesia_shutdown(Rest); - Running when - Running =:= yes orelse - Running =:= starting orelse - Running =:= stopping -> - timer:sleep(?MNESIA_STOPPED_PING_INTERNAL), - wait_for_mnesia_shutdown(AllNodes); - _ -> - wait_for_mnesia_shutdown(Rest) - end; -wait_for_mnesia_shutdown([]) -> - ok. +%% This improves the previous implementation, but could still potentially enter an infinity +%% loop. If it also possible that for when it finishes some of the nodes have been +%% manually restarted, but we can't do much more (apart from stop them again). So let it +%% continue and notify all the losers to restart. +wait_for_mnesia_shutdown(AllNodes) -> + Monitors = lists:foldl(fun(Node, Monitors0) -> + pmon:monitor({mnesia_sup, Node}, Monitors0) + end, pmon:new(), AllNodes), + wait_for_supervisors(Monitors). + +wait_for_supervisors(Monitors) -> + case pmon:is_empty(Monitors) of + true -> + ok; + false -> + receive + {'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} -> + wait_for_supervisors(pmon:erase(I, Monitors)) + after + 60000 -> + AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)], + rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending " + "winner notification again to these ~n", [AliveLosers]), + [send(L, {winner_is, node()}) || L <- AliveLosers], + wait_for_mnesia_shutdown(AliveLosers) + end + end. restart_loser(State, Winner) -> rabbit_log:warning( @@ -402,3 +435,13 @@ fmt_error({remote_down, RemoteDown}) -> rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]); fmt_error({nodes_down, NodesDown}) -> rabbit_misc:format("Local nodes down: ~p", [NodesDown]). + +stop_partition(Losers) -> + %% The leader said everything was ready - do we agree? If not then + %% give up. + Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers), + case Down of + [] -> [send(L, {winner_is, node()}) || L <- Losers], + {winner_waiting, Losers, Losers}; + _ -> abort(Down, Losers) + end. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0322aacfd1..bea2a3fa96 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -336,7 +336,17 @@ init([]) -> process_flag(trap_exit, true), net_kernel:monitor_nodes(true, [nodedown_reason]), {ok, _} = mnesia:subscribe(system), - {ok, ensure_keepalive_timer(#state{monitors = pmon:new(), + %% If the node has been restarted, Mnesia can trigger a system notification + %% before the monitor subscribes to receive them. To avoid autoheal blocking due to + %% the inconsistent database event never arriving, we being monitoring all running + %% nodes as early as possible. The rest of the monitoring ops will only be triggered + %% when notifications arrive. + Nodes = possibly_partitioned_nodes(), + startup_log(Nodes), + Monitors = lists:foldl(fun(Node, Monitors0) -> + pmon:monitor({rabbit, Node}, Monitors0) + end, pmon:new(), Nodes), + {ok, ensure_keepalive_timer(#state{monitors = Monitors, subscribers = pmon:new(), partitions = [], guid = rabbit_guid:gen(), @@ -486,20 +496,22 @@ handle_cast({partial_partition_disconnect, Other}, State) -> %% mnesia propagation. handle_cast({node_up, Node, NodeType}, State = #state{monitors = Monitors}) -> - case pmon:is_monitored({rabbit, Node}, Monitors) of - true -> {noreply, State}; - false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), - {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), - write_cluster_status({add_node(Node, AllNodes), - case NodeType of - disc -> add_node(Node, DiscNodes); - ram -> DiscNodes - end, - add_node(Node, RunningNodes)}), - ok = handle_live_rabbit(Node), - Monitors1 = pmon:monitor({rabbit, Node}, Monitors), - {noreply, maybe_autoheal(State#state{monitors = Monitors1})} - end; + rabbit_log:info("rabbit on node ~p up~n", [Node]), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({add_node(Node, AllNodes), + case NodeType of + disc -> add_node(Node, DiscNodes); + ram -> DiscNodes + end, + add_node(Node, RunningNodes)}), + ok = handle_live_rabbit(Node), + Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of + true -> + Monitors; + false -> + pmon:monitor({rabbit, Node}, Monitors) + end, + {noreply, maybe_autoheal(State#state{monitors = Monitors1})}; handle_cast({joined_cluster, Node, NodeType}, State) -> {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), @@ -572,7 +584,7 @@ handle_info({mnesia_system_event, State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of true -> State; false -> State#state{ - monitors = pmon:monitor({rabbit, Node}, Monitors)} + monitors = pmon:monitor({rabbit, Node}, Monitors)} end, ok = handle_live_rabbit(Node), Partitions1 = lists:usort([Node | Partitions]), @@ -873,3 +885,12 @@ alive_rabbit_nodes(Nodes) -> ping_all() -> [net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)], ok. + +possibly_partitioned_nodes() -> + alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running). + +startup_log([]) -> + rabbit_log:info("Starting rabbit_node_monitor~n", []); +startup_log(Nodes) -> + rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n", + [Nodes]). diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl index 1b901b5940..aa1c1df24f 100644 --- a/test/partitions_SUITE.erl +++ b/test/partitions_SUITE.erl @@ -45,6 +45,8 @@ groups() -> {cluster_size_3, [], [ autoheal, autoheal_after_pause_if_all_down, + autoheal_multiple_partial_partitions, + autoheal_unexpected_finish, ignore, pause_if_all_down_on_blocked, pause_if_all_down_on_down, @@ -307,6 +309,27 @@ do_autoheal(Config) -> Test([{A, B}, {A, C}, {B, C}]), ok. +autoheal_multiple_partial_partitions(Config) -> + set_mode(Config, autoheal), + [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + block_unblock([{A, B}]), + block_unblock([{A, C}]), + [await_listening(N, true) || N <- [A, B, C]], + [await_partitions(N, []) || N <- [A, B, C]], + ok. + +autoheal_unexpected_finish(Config) -> + set_mode(Config, autoheal), + [A, B, _C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]), + Pid ! {autoheal_msg, {autoheal_finished, B}}, + Pid = rpc:call(A, erlang, whereis, [rabbit_node_monitor]), + ok. + partial_false_positive(Config) -> [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), block([{A, B}]), |
