summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl53
-rw-r--r--src/mochinum.erl358
-rw-r--r--src/rabbit.app.src3
-rw-r--r--src/rabbit_amqqueue_process.erl101
-rw-r--r--src/rabbit_autoheal.erl102
-rw-r--r--src/rabbit_cli.erl55
-rw-r--r--src/rabbit_control_main.erl30
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl21
-rw-r--r--src/rabbit_mirror_queue_master.erl66
-rw-r--r--src/rabbit_mirror_queue_misc.erl31
-rw-r--r--src/rabbit_mirror_queue_slave.erl29
-rw-r--r--src/rabbit_mirror_queue_sync.erl12
-rw-r--r--src/rabbit_mnesia.erl2
-rw-r--r--src/rabbit_node_monitor.erl53
-rw-r--r--src/rabbit_policy.erl4
-rw-r--r--src/rabbit_queue_location_validator.erl4
-rw-r--r--src/rabbit_upgrade_functions.erl19
-rw-r--r--src/rabbit_variable_queue.erl2
18 files changed, 425 insertions, 520 deletions
diff --git a/src/gm.erl b/src/gm.erl
index 176e14537f..74e19ee6fd 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -611,7 +611,7 @@ handle_call({add_on_right, NewMember}, _From,
handle_callback_result({Result, {ok, Group}, State1})
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end.
%% add_on_right causes a catchup to be sent immediately from the left,
@@ -646,7 +646,7 @@ handle_cast({?TAG, ReqVer, Msg},
Result, fun handle_msg_true/3, fun handle_msg_false/3, Msg, State1))
catch
lost_membership ->
- {stop, normal, State}
+ {stop, shutdown, State}
end;
handle_cast({broadcast, _Msg, _SizeHint},
@@ -675,16 +675,21 @@ handle_cast(join, State = #state { self = Self,
module = Module,
callback_args = Args,
txn_executor = TxnFun }) ->
- View = join_group(Self, GroupName, TxnFun),
- MembersState =
- case alive_view_members(View) of
- [Self] -> blank_member_state();
- _ -> undefined
- end,
- State1 = check_neighbours(State #state { view = View,
- members_state = MembersState }),
- handle_callback_result(
- {Module:joined(Args, get_pids(all_known_members(View))), State1});
+ try
+ View = join_group(Self, GroupName, TxnFun),
+ MembersState =
+ case alive_view_members(View) of
+ [Self] -> blank_member_state();
+ _ -> undefined
+ end,
+ State1 = check_neighbours(State #state { view = View,
+ members_state = MembersState }),
+ handle_callback_result(
+ {Module:joined(Args, get_pids(all_known_members(View))), State1})
+ catch
+ lost_membership ->
+ {stop, shutdown, State}
+ end;
handle_cast({validate_members, OldMembers},
State = #state { view = View,
@@ -756,13 +761,18 @@ handle_info({'DOWN', MRef, process, _Pid, Reason},
end
catch
lost_membership ->
- {stop, normal, State}
- end.
+ {stop, shutdown, State}
+ end;
+handle_info(_, State) ->
+ %% Discard any unexpected messages, such as late replies from neighbour_call/2
+ %% TODO: For #gm_group{} related info messages, it could be worthwhile to
+ %% change_view/2, as this might reflect an alteration in the gm group, meaning
+ %% we now need to update our state. see rabbitmq-server#914.
+ noreply(State).
terminate(Reason, #state { module = Module, callback_args = Args }) ->
Module:handle_terminate(Args, Reason).
-
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
@@ -865,7 +875,7 @@ handle_msg({activity, Left, Activity},
Result, fun activity_true/3, fun activity_false/3, Activity3, State2)
catch
lost_membership ->
- {{stop, normal}, State}
+ {{stop, shutdown}, State}
end;
handle_msg({activity, _NotLeft, _Activity}, State) ->
@@ -1131,7 +1141,7 @@ record_dead_member_in_group(Self, Member, GroupName, TxnFun, Verify) ->
true ->
check_membership(Self, read_group(GroupName));
false ->
- read_group(GroupName)
+ check_group(read_group(GroupName))
end,
case lists:splitwith(
fun (Member1) -> Member1 =/= Member end, Members) of
@@ -1593,7 +1603,9 @@ check_membership(Self, #gm_group{members = M} = Group) ->
Group;
false ->
throw(lost_membership)
- end.
+ end;
+check_membership(_Self, {error, not_found}) ->
+ throw(lost_membership).
check_membership(GroupName) ->
case dirty_read_group(GroupName) of
@@ -1607,3 +1619,8 @@ check_membership(GroupName) ->
{error, not_found} ->
throw(lost_membership)
end.
+
+check_group({error, not_found}) ->
+ throw(lost_membership);
+check_group(Any) ->
+ Any.
diff --git a/src/mochinum.erl b/src/mochinum.erl
deleted file mode 100644
index 4ea7a22acf..0000000000
--- a/src/mochinum.erl
+++ /dev/null
@@ -1,358 +0,0 @@
-%% This file is a copy of `mochijson2.erl' from mochiweb, revision
-%% d541e9a0f36c00dcadc2e589f20e47fbf46fc76f. For the license, see
-%% `LICENSE-MIT-Mochi'.
-
-%% @copyright 2007 Mochi Media, Inc.
-%% @author Bob Ippolito <bob@mochimedia.com>
-
-%% @doc Useful numeric algorithms for floats that cover some deficiencies
-%% in the math module. More interesting is digits/1, which implements
-%% the algorithm from:
-%% http://www.cs.indiana.edu/~burger/fp/index.html
-%% See also "Printing Floating-Point Numbers Quickly and Accurately"
-%% in Proceedings of the SIGPLAN '96 Conference on Programming Language
-%% Design and Implementation.
-
--module(mochinum).
--author("Bob Ippolito <bob@mochimedia.com>").
--export([digits/1, frexp/1, int_pow/2, int_ceil/1]).
-
-%% IEEE 754 Float exponent bias
--define(FLOAT_BIAS, 1022).
--define(MIN_EXP, -1074).
--define(BIG_POW, 4503599627370496).
-
-%% External API
-
-%% @spec digits(number()) -> string()
-%% @doc Returns a string that accurately represents the given integer or float
-%% using a conservative amount of digits. Great for generating
-%% human-readable output, or compact ASCII serializations for floats.
-digits(N) when is_integer(N) ->
- integer_to_list(N);
-digits(0.0) ->
- "0.0";
-digits(Float) ->
- {Frac1, Exp1} = frexp_int(Float),
- [Place0 | Digits0] = digits1(Float, Exp1, Frac1),
- {Place, Digits} = transform_digits(Place0, Digits0),
- R = insert_decimal(Place, Digits),
- case Float < 0 of
- true ->
- [$- | R];
- _ ->
- R
- end.
-
-%% @spec frexp(F::float()) -> {Frac::float(), Exp::float()}
-%% @doc Return the fractional and exponent part of an IEEE 754 double,
-%% equivalent to the libc function of the same name.
-%% F = Frac * pow(2, Exp).
-frexp(F) ->
- frexp1(unpack(F)).
-
-%% @spec int_pow(X::integer(), N::integer()) -> Y::integer()
-%% @doc Moderately efficient way to exponentiate integers.
-%% int_pow(10, 2) = 100.
-int_pow(_X, 0) ->
- 1;
-int_pow(X, N) when N > 0 ->
- int_pow(X, N, 1).
-
-%% @spec int_ceil(F::float()) -> integer()
-%% @doc Return the ceiling of F as an integer. The ceiling is defined as
-%% F when F == trunc(F);
-%% trunc(F) when F &lt; 0;
-%% trunc(F) + 1 when F &gt; 0.
-int_ceil(X) ->
- T = trunc(X),
- case (X - T) of
- Pos when Pos > 0 -> T + 1;
- _ -> T
- end.
-
-
-%% Internal API
-
-int_pow(X, N, R) when N < 2 ->
- R * X;
-int_pow(X, N, R) ->
- int_pow(X * X, N bsr 1, case N band 1 of 1 -> R * X; 0 -> R end).
-
-insert_decimal(0, S) ->
- "0." ++ S;
-insert_decimal(Place, S) when Place > 0 ->
- L = length(S),
- case Place - L of
- 0 ->
- S ++ ".0";
- N when N < 0 ->
- {S0, S1} = lists:split(L + N, S),
- S0 ++ "." ++ S1;
- N when N < 6 ->
- %% More places than digits
- S ++ lists:duplicate(N, $0) ++ ".0";
- _ ->
- insert_decimal_exp(Place, S)
- end;
-insert_decimal(Place, S) when Place > -6 ->
- "0." ++ lists:duplicate(abs(Place), $0) ++ S;
-insert_decimal(Place, S) ->
- insert_decimal_exp(Place, S).
-
-insert_decimal_exp(Place, S) ->
- [C | S0] = S,
- S1 = case S0 of
- [] ->
- "0";
- _ ->
- S0
- end,
- Exp = case Place < 0 of
- true ->
- "e-";
- false ->
- "e+"
- end,
- [C] ++ "." ++ S1 ++ Exp ++ integer_to_list(abs(Place - 1)).
-
-
-digits1(Float, Exp, Frac) ->
- Round = ((Frac band 1) =:= 0),
- case Exp >= 0 of
- true ->
- BExp = 1 bsl Exp,
- case (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * BExp * 2), 2, BExp, BExp,
- Round, Round, Float);
- false ->
- scale((Frac * BExp * 4), 4, (BExp * 2), BExp,
- Round, Round, Float)
- end;
- false ->
- case (Exp =:= ?MIN_EXP) orelse (Frac =/= ?BIG_POW) of
- true ->
- scale((Frac * 2), 1 bsl (1 - Exp), 1, 1,
- Round, Round, Float);
- false ->
- scale((Frac * 4), 1 bsl (2 - Exp), 2, 1,
- Round, Round, Float)
- end
- end.
-
-scale(R, S, MPlus, MMinus, LowOk, HighOk, Float) ->
- Est = int_ceil(math:log10(abs(Float)) - 1.0e-10),
- %% Note that the scheme implementation uses a 326 element look-up table
- %% for int_pow(10, N) where we do not.
- case Est >= 0 of
- true ->
- fixup(R, S * int_pow(10, Est), MPlus, MMinus, Est,
- LowOk, HighOk);
- false ->
- Scale = int_pow(10, -Est),
- fixup(R * Scale, S, MPlus * Scale, MMinus * Scale, Est,
- LowOk, HighOk)
- end.
-
-fixup(R, S, MPlus, MMinus, K, LowOk, HighOk) ->
- TooLow = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TooLow of
- true ->
- [(K + 1) | generate(R, S, MPlus, MMinus, LowOk, HighOk)];
- false ->
- [K | generate(R * 10, S, MPlus * 10, MMinus * 10, LowOk, HighOk)]
- end.
-
-generate(R0, S, MPlus, MMinus, LowOk, HighOk) ->
- D = R0 div S,
- R = R0 rem S,
- TC1 = case LowOk of
- true ->
- R =< MMinus;
- false ->
- R < MMinus
- end,
- TC2 = case HighOk of
- true ->
- (R + MPlus) >= S;
- false ->
- (R + MPlus) > S
- end,
- case TC1 of
- false ->
- case TC2 of
- false ->
- [D | generate(R * 10, S, MPlus * 10, MMinus * 10,
- LowOk, HighOk)];
- true ->
- [D + 1]
- end;
- true ->
- case TC2 of
- false ->
- [D];
- true ->
- case R * 2 < S of
- true ->
- [D];
- false ->
- [D + 1]
- end
- end
- end.
-
-unpack(Float) ->
- <<Sign:1, Exp:11, Frac:52>> = <<Float:64/float>>,
- {Sign, Exp, Frac}.
-
-frexp1({_Sign, 0, 0}) ->
- {0.0, 0};
-frexp1({Sign, 0, Frac}) ->
- Exp = log2floor(Frac),
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, (Frac-1):52>>,
- {Frac1, -(?FLOAT_BIAS) - 52 + Exp};
-frexp1({Sign, Exp, Frac}) ->
- <<Frac1:64/float>> = <<Sign:1, ?FLOAT_BIAS:11, Frac:52>>,
- {Frac1, Exp - ?FLOAT_BIAS}.
-
-log2floor(Int) ->
- log2floor(Int, 0).
-
-log2floor(0, N) ->
- N;
-log2floor(Int, N) ->
- log2floor(Int bsr 1, 1 + N).
-
-
-transform_digits(Place, [0 | Rest]) ->
- transform_digits(Place, Rest);
-transform_digits(Place, Digits) ->
- {Place, [$0 + D || D <- Digits]}.
-
-
-frexp_int(F) ->
- case unpack(F) of
- {_Sign, 0, Frac} ->
- {Frac, ?MIN_EXP};
- {_Sign, Exp, Frac} ->
- {Frac + (1 bsl 52), Exp - 53 - ?FLOAT_BIAS}
- end.
-
-%%
-%% Tests
-%%
--ifdef(TEST).
--include_lib("eunit/include/eunit.hrl").
-
-int_ceil_test() ->
- ?assertEqual(1, int_ceil(0.0001)),
- ?assertEqual(0, int_ceil(0.0)),
- ?assertEqual(1, int_ceil(0.99)),
- ?assertEqual(1, int_ceil(1.0)),
- ?assertEqual(-1, int_ceil(-1.5)),
- ?assertEqual(-2, int_ceil(-2.0)),
- ok.
-
-int_pow_test() ->
- ?assertEqual(1, int_pow(1, 1)),
- ?assertEqual(1, int_pow(1, 0)),
- ?assertEqual(1, int_pow(10, 0)),
- ?assertEqual(10, int_pow(10, 1)),
- ?assertEqual(100, int_pow(10, 2)),
- ?assertEqual(1000, int_pow(10, 3)),
- ok.
-
-digits_test() ->
- ?assertEqual("0",
- digits(0)),
- ?assertEqual("0.0",
- digits(0.0)),
- ?assertEqual("1.0",
- digits(1.0)),
- ?assertEqual("-1.0",
- digits(-1.0)),
- ?assertEqual("0.1",
- digits(0.1)),
- ?assertEqual("0.01",
- digits(0.01)),
- ?assertEqual("0.001",
- digits(0.001)),
- ?assertEqual("1.0e+6",
- digits(1000000.0)),
- ?assertEqual("0.5",
- digits(0.5)),
- ?assertEqual("4503599627370496.0",
- digits(4503599627370496.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324 =:= 5.0e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual("5.0e-324",
- digits(SmallDenorm)),
- ?assertEqual(SmallDenorm,
- list_to_float(digits(SmallDenorm))),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual("2.225073858507201e-308",
- digits(BigDenorm)),
- ?assertEqual(BigDenorm,
- list_to_float(digits(BigDenorm))),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual("2.2250738585072014e-308",
- digits(SmallNorm)),
- ?assertEqual(SmallNorm,
- list_to_float(digits(SmallNorm))),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual("1.7976931348623157e+308",
- digits(LargeNorm)),
- ?assertEqual(LargeNorm,
- list_to_float(digits(LargeNorm))),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual("5.0e-324",
- digits(math:pow(2, -1074))),
- ok.
-
-frexp_test() ->
- %% zero
- ?assertEqual({0.0, 0}, frexp(0.0)),
- %% one
- ?assertEqual({0.5, 1}, frexp(1.0)),
- %% negative one
- ?assertEqual({-0.5, 1}, frexp(-1.0)),
- %% small denormalized number
- %% 4.94065645841246544177e-324
- <<SmallDenorm/float>> = <<0,0,0,0,0,0,0,1>>,
- ?assertEqual({0.5, -1073}, frexp(SmallDenorm)),
- %% large denormalized number
- %% 2.22507385850720088902e-308
- <<BigDenorm/float>> = <<0,15,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999978, -1022},
- frexp(BigDenorm)),
- %% small normalized number
- %% 2.22507385850720138309e-308
- <<SmallNorm/float>> = <<0,16,0,0,0,0,0,0>>,
- ?assertEqual({0.5, -1021}, frexp(SmallNorm)),
- %% large normalized number
- %% 1.79769313486231570815e+308
- <<LargeNorm/float>> = <<127,239,255,255,255,255,255,255>>,
- ?assertEqual(
- {0.99999999999999989, 1024},
- frexp(LargeNorm)),
- %% issue #10 - mochinum:frexp(math:pow(2, -1074)).
- ?assertEqual(
- {0.5, -1073},
- frexp(math:pow(2, -1074))),
- ok.
-
--endif.
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index 572c1f6bc6..872336bd8e 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -95,7 +95,8 @@
{msg_store_credit_disc_bound, {2000, 500}},
{msg_store_io_batch_size, 2048},
%% see rabbitmq-server#143
- {credit_flow_default_credit, {200, 50}},
+ %% and rabbitmq-server#949
+ {credit_flow_default_credit, {200, 100}},
%% see rabbitmq-server#248
%% and rabbitmq-server#667
{channel_operation_timeout, 15000}
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index da84c612d9..4f8581f78a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -85,6 +85,7 @@
%% e.g. message expiration messages from previously set up timers
%% that may or may not be still valid
args_policy_version,
+ mirroring_policy_version = 0,
%% running | flow | idle
status
}).
@@ -1017,7 +1018,17 @@ prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) ->
end.
handle_call({init, Recover}, From, State) ->
- init_it(Recover, From, State);
+ try
+ init_it(Recover, From, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_call(info, _From, State) ->
reply(infos(info_keys(), State), State);
@@ -1168,7 +1179,17 @@ handle_call(cancel_sync_mirrors, _From, State) ->
reply({ok, not_syncing}, State).
handle_cast(init, State) ->
- init_it({no_barrier, non_clean_shutdown}, none, State);
+ try
+ init_it({no_barrier, non_clean_shutdown}, none, State)
+ catch
+ {coordinator_not_started, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ %% is trapping exists. The master captures this return value and
+ %% throws the current exception.
+ {stop, Reason, State}
+ end;
handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
@@ -1235,22 +1256,15 @@ handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
noreply(State);
-handle_cast(start_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- %% lookup again to get policy for init_with_existing_bq
- {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
- true = BQ =/= rabbit_mirror_queue_master, %% assertion
- BQ1 = rabbit_mirror_queue_master,
- BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
-
-handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- BQ = rabbit_mirror_queue_master, %% assertion
- {BQ1, BQS1} = BQ:stop_mirroring(BQS),
- noreply(State#q{backing_queue = BQ1,
- backing_queue_state = BQS1});
+handle_cast(update_mirroring, State = #q{q = Q,
+ mirroring_policy_version = Version}) ->
+ case needs_update_mirroring(Q, Version) of
+ false ->
+ noreply(State);
+ {Policy, NewVersion} ->
+ State1 = State#q{mirroring_policy_version = NewVersion},
+ noreply(update_mirroring(Policy, State1))
+ end;
handle_cast({credit, ChPid, CTag, Credit, Drain},
State = #q{consumers = Consumers,
@@ -1393,3 +1407,54 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.
format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
+
+needs_update_mirroring(Q, Version) ->
+ {ok, UpQ} = rabbit_amqqueue:lookup(Q#amqqueue.name),
+ DBVersion = UpQ#amqqueue.policy_version,
+ case DBVersion > Version of
+ true -> {rabbit_policy:get(<<"ha-mode">>, UpQ), DBVersion};
+ false -> false
+ end.
+
+update_mirroring(Policy, State = #q{backing_queue = BQ}) ->
+ case update_to(Policy, BQ) of
+ start_mirroring ->
+ start_mirroring(State);
+ stop_mirroring ->
+ stop_mirroring(State);
+ ignore ->
+ State;
+ update_ha_mode ->
+ update_ha_mode(State)
+ end.
+
+update_to(undefined, rabbit_mirror_queue_master) ->
+ stop_mirroring;
+update_to(_, rabbit_mirror_queue_master) ->
+ update_ha_mode;
+update_to(undefined, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ ignore;
+update_to(_, BQ) when BQ =/= rabbit_mirror_queue_master ->
+ start_mirroring.
+
+start_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+stop_mirroring(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1}.
+
+update_ha_mode(State) ->
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ ok = rabbit_mirror_queue_misc:update_mirrors(Q),
+ State.
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 5865ba8227..3adcc09692 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -180,6 +180,18 @@ node_down(_Node, not_healing) ->
node_down(Node, {winner_waiting, _, Notify}) ->
abort([Node], Notify);
+node_down(Node, {leader_waiting, Node, _Notify}) ->
+ %% The winner went down, we don't know what to do so we simply abort.
+ rabbit_log:info("Autoheal: aborting - winner ~p went down~n", [Node]),
+ not_healing;
+
+node_down(Node, {leader_waiting, _, _} = St) ->
+ %% If it is a partial partition, the winner might continue with the
+ %% healing process. If it is a full partition, the winner will also
+ %% see it and abort. Let's wait for it.
+ rabbit_log:info("Autoheal: ~p went down, waiting for winner decision ~n", [Node]),
+ St;
+
node_down(Node, _State) ->
rabbit_log:info("Autoheal: aborting - ~p went down~n", [Node]),
not_healing.
@@ -218,14 +230,24 @@ handle_msg({become_winner, Losers},
not_healing, _Partitions) ->
rabbit_log:info("Autoheal: I am the winner, waiting for ~p to stop~n",
[Losers]),
- %% The leader said everything was ready - do we agree? If not then
- %% give up.
- Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
- case Down of
- [] -> [send(L, {winner_is, node()}) || L <- Losers],
- {winner_waiting, Losers, Losers};
- _ -> abort(Down, Losers)
- end;
+ stop_partition(Losers);
+
+handle_msg({become_winner, Losers},
+ {winner_waiting, _, Losers}, _Partitions) ->
+ %% The leader has aborted the healing, might have seen us down but
+ %% we didn't see the same. Let's try again as it is the same partition.
+ rabbit_log:info("Autoheal: I am the winner and received a duplicated "
+ "request, waiting again for ~p to stop~n", [Losers]),
+ stop_partition(Losers);
+
+handle_msg({become_winner, _},
+ {winner_waiting, _, Losers}, _Partitions) ->
+ %% Something has happened to the leader, it might have seen us down but we
+ %% are still alive. Partitions have changed, cannot continue.
+ rabbit_log:info("Autoheal: I am the winner and received another healing "
+ "request, partitions have changed to ~p. Aborting ~n", [Losers]),
+ winner_finish(Losers),
+ not_healing;
handle_msg({winner_is, Winner}, State = not_healing,
_Partitions) ->
@@ -269,6 +291,14 @@ handle_msg({autoheal_finished, Winner}, not_healing, _Partitions)
%% We are the leader and the winner. The state already transitioned
%% to "not_healing" at the end of the autoheal process.
rabbit_log:info("Autoheal finished according to winner ~p~n", [node()]),
+ not_healing;
+
+handle_msg({autoheal_finished, Winner}, not_healing, _Partitions) ->
+ %% We might have seen the winner down during a partial partition and
+ %% transitioned to not_healing. However, the winner was still able
+ %% to finish. Let it pass.
+ rabbit_log:info("Autoheal finished according to winner ~p."
+ " Unexpected, I might have previously seen the winner down~n", [Winner]),
not_healing.
%%----------------------------------------------------------------------------
@@ -279,7 +309,9 @@ abort(Down, Notify) ->
rabbit_log:info("Autoheal: aborting - ~p down~n", [Down]),
%% Make sure any nodes waiting for us start - it won't necessarily
%% heal the partition but at least they won't get stuck.
- winner_finish(Notify).
+ %% If we are executing this, we are not stopping. Thus, don't wait
+ %% for ourselves!
+ winner_finish(Notify -- [node()]).
winner_finish(Notify) ->
%% There is a race in Mnesia causing a starting loser to hang
@@ -297,21 +329,33 @@ winner_finish(Notify) ->
send(leader(), {autoheal_finished, node()}),
not_healing.
-wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
- case rpc:call(Node, mnesia, system_info, [is_running]) of
- no ->
- wait_for_mnesia_shutdown(Rest);
- Running when
- Running =:= yes orelse
- Running =:= starting orelse
- Running =:= stopping ->
- timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
- wait_for_mnesia_shutdown(AllNodes);
- _ ->
- wait_for_mnesia_shutdown(Rest)
- end;
-wait_for_mnesia_shutdown([]) ->
- ok.
+%% This improves the previous implementation, but could still potentially enter an infinity
+%% loop. If it also possible that for when it finishes some of the nodes have been
+%% manually restarted, but we can't do much more (apart from stop them again). So let it
+%% continue and notify all the losers to restart.
+wait_for_mnesia_shutdown(AllNodes) ->
+ Monitors = lists:foldl(fun(Node, Monitors0) ->
+ pmon:monitor({mnesia_sup, Node}, Monitors0)
+ end, pmon:new(), AllNodes),
+ wait_for_supervisors(Monitors).
+
+wait_for_supervisors(Monitors) ->
+ case pmon:is_empty(Monitors) of
+ true ->
+ ok;
+ false ->
+ receive
+ {'DOWN', _MRef, process, {mnesia_sup, _} = I, _Reason} ->
+ wait_for_supervisors(pmon:erase(I, Monitors))
+ after
+ 60000 ->
+ AliveLosers = [Node || {_, Node} <- pmon:monitored(Monitors)],
+ rabbit_log:info("Autoheal: mnesia in nodes ~p is still up, sending "
+ "winner notification again to these ~n", [AliveLosers]),
+ [send(L, {winner_is, node()}) || L <- AliveLosers],
+ wait_for_mnesia_shutdown(AliveLosers)
+ end
+ end.
restart_loser(State, Winner) ->
rabbit_log:warning(
@@ -391,3 +435,13 @@ fmt_error({remote_down, RemoteDown}) ->
rabbit_misc:format("Remote nodes disconnected:~n ~p", [RemoteDown]);
fmt_error({nodes_down, NodesDown}) ->
rabbit_misc:format("Local nodes down: ~p", [NodesDown]).
+
+stop_partition(Losers) ->
+ %% The leader said everything was ready - do we agree? If not then
+ %% give up.
+ Down = Losers -- rabbit_node_monitor:alive_rabbit_nodes(Losers),
+ case Down of
+ [] -> [send(L, {winner_is, node()}) || L <- Losers],
+ {winner_waiting, Losers, Losers};
+ _ -> abort(Down, Losers)
+ end.
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 6b35482217..c0e5c93247 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -18,7 +18,7 @@
-include("rabbit_cli.hrl").
-export([main/3, start_distribution/0, start_distribution/1,
- parse_arguments/4, filter_opts/2,
+ parse_arguments/4, mutually_exclusive_flags/3,
rpc_call/4, rpc_call/5, rpc_call/7]).
%%----------------------------------------------------------------------------
@@ -42,8 +42,7 @@
[{string(), optdef()}], string(), [string()]) ->
parse_result().
--spec filter_opts([{option_name(), option_value()}], [option_name()]) ->
- [boolean()].
+-spec mutually_exclusive_flags([{option_name(), option_value()}], term(), [{option_name(), term()}]) -> {ok, term()} | {error, string()}.
-spec rpc_call(node(), atom(), atom(), [any()]) -> any().
-spec rpc_call(node(), atom(), atom(), [any()], number()) -> any().
@@ -147,7 +146,7 @@ main(ParseFun, DoFun, UsageMod) ->
start_distribution_anon(0, LastError) ->
{error, LastError};
start_distribution_anon(TriesLeft, _) ->
- NameCandidate = list_to_atom(rabbit_misc:format("rabbitmq-cli-~2..0b", [rand_compat:uniform(100)])),
+ NameCandidate = generate_cli_node_name(),
case net_kernel:start([NameCandidate, name_type()]) of
{ok, _} = Result ->
Result;
@@ -155,7 +154,7 @@ start_distribution_anon(TriesLeft, _) ->
start_distribution_anon(TriesLeft - 1, Reason)
end.
-%% Tries to start distribution with randonm name choosen from limited list of candidates - to
+%% Tries to start distribution with random name choosen from limited list of candidates - to
%% prevent atom table pollution on target nodes.
start_distribution() ->
rabbit_nodes:ensure_epmd(),
@@ -171,6 +170,22 @@ name_type() ->
_ -> shortnames
end.
+generate_cli_node_name() ->
+ Base = rabbit_misc:format("rabbitmq-cli-~2..0b", [rand_compat:uniform(100)]),
+ NameAsList =
+ case {name_type(), inet_db:res_option(domain)} of
+ {longnames, []} ->
+ %% Distribution will fail to start if it's unable to
+ %% determine FQDN of a node (with at least one dot in
+ %% a name).
+ %% CLI is always an initiator of connection, so it
+ %% doesn't matter if the name will not resolve.
+ Base ++ "@" ++ inet_db:gethostname() ++ ".no-domain";
+ _ ->
+ Base
+ end,
+ list_to_atom(NameAsList).
+
usage(Mod) ->
usage(Mod, ?EX_USAGE).
@@ -250,20 +265,22 @@ process_opts(Defs, C, [A | As], Found, KVs, Outs) ->
{none, _, _} -> no_command
end.
-%% When we have a set of flags that are used for filtering, we want by
-%% default to include every such option in our output. But if a user
-%% explicitly specified any such flag, we want to include only items
-%% which he has requested.
-filter_opts(CurrentOptionValues, AllOptionNames) ->
- Explicit = lists:map(fun(OptName) ->
- proplists:get_bool(OptName, CurrentOptionValues)
- end,
- AllOptionNames),
- case lists:member(true, Explicit) of
- true ->
- Explicit;
- false ->
- lists:duplicate(length(AllOptionNames), true)
+mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
+ PresentFlags = lists:filtermap(fun({OptName, _} = _O) ->
+ proplists:get_bool(OptName, CurrentOptionValues)
+ end,
+ FlagsAndValues),
+ case PresentFlags of
+ [] ->
+ {ok, Default};
+ [{_, Value}] ->
+ {ok, Value};
+ _ ->
+ Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
+ CommaSeparated = string:join(lists:droplast(Names), ", "),
+ AndOneMore = lists:last(Names),
+ Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
+ {error, lists:flatten(Msg)}
end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index d2f0e8bcb0..92898c2a2c 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -74,7 +74,7 @@
{clear_policy, [?VHOST_DEF]},
{list_policies, [?VHOST_DEF]},
- {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF]},
+ {list_queues, [?VHOST_DEF, ?OFFLINE_DEF, ?ONLINE_DEF, ?LOCAL_DEF]},
{list_exchanges, [?VHOST_DEF]},
{list_bindings, [?VHOST_DEF]},
{list_connections, [?VHOST_DEF]},
@@ -632,12 +632,19 @@ action(list_user_permissions, Node, Args = [_Username], _Opts, Inform, Timeout)
true);
action(list_queues, Node, Args, Opts, Inform, Timeout) ->
- [Online, Offline] = rabbit_cli:filter_opts(Opts, [?ONLINE_OPT, ?OFFLINE_OPT]),
- Inform("Listing queues", []),
- VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
- ArgAtoms = default_if_empty(Args, [name, messages]),
- call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Online, Offline]},
- ArgAtoms, Timeout);
+ case rabbit_cli:mutually_exclusive_flags(
+ Opts, all, [{?ONLINE_OPT, online}
+ ,{?OFFLINE_OPT, offline}
+ ,{?LOCAL_OPT, local}]) of
+ {ok, Filter} ->
+ Inform("Listing queues", []),
+ VHostArg = list_to_binary(proplists:get_value(?VHOST_OPT, Opts)),
+ ArgAtoms = default_if_empty(Args, [name, messages]),
+ call(Node, {rabbit_amqqueue, info_all, [VHostArg, ArgAtoms, Filter]},
+ ArgAtoms, Timeout);
+ {error, ErrStr} ->
+ {error_string, ErrStr}
+ end;
action(list_exchanges, Node, Args, Opts, Inform, Timeout) ->
Inform("Listing exchanges", []),
@@ -953,6 +960,9 @@ nodes_in_cluster(Node) ->
unsafe_rpc(Node, rabbit_mnesia, cluster_nodes, [running]).
alarms_by_node(Name) ->
- Status = unsafe_rpc(Name, rabbit, status, []),
- {_, As} = lists:keyfind(alarms, 1, Status),
- {Name, As}.
+ case rpc_call(Name, rabbit, status, []) of
+ {badrpc,nodedown} -> {Name, [nodedown]};
+ Status ->
+ {_, As} = lists:keyfind(alarms, 1, Status),
+ {Name, As}
+ end.
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 221f11f18a..562f0f0fcf 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -355,13 +355,28 @@ handle_cast({gm_deaths, DeadGMPids},
DeadPids),
rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes, async),
noreply(State);
+ {ok, _MPid0, DeadPids, _ExtraNodes} ->
+ %% see rabbitmq-server#914;
+ %% Different slave is now master, stop current coordinator normally.
+ %% Initiating queue is now slave and the least we could do is report
+ %% deaths which we 'think' we saw.
+ %% NOTE: Reported deaths here, could be inconsistant.
+ rabbit_mirror_queue_misc:report_deaths(MPid, false, QueueName,
+ DeadPids),
+ {stop, shutdown, State};
{error, not_found} ->
{stop, normal, State}
end;
-handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
- ok = DepthFun(),
- noreply(State);
+handle_cast(request_depth, State = #state { depth_fun = DepthFun,
+ q = #amqqueue { name = QName, pid = MPid }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{ pid = MPid }} ->
+ ok = DepthFun(),
+ noreply(State);
+ _ ->
+ {stop, shutdown, State}
+ end;
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index d78f6180e7..d82cdf336a 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -101,35 +101,43 @@ init(Q, Recover, AsyncCallback) ->
State.
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
- {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
- Q, undefined, sender_death_fun(), depth_fun()),
- GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- Self = self(),
- ok = rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue{gm_pids = GMPids}]
- = mnesia:read({rabbit_queue, QName}),
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
- state = live})
- end),
- {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- %% We need synchronous add here (i.e. do not return until the
- %% slave is running) so that when queue declaration is finished
- %% all slaves are up; we don't want to end up with unsynced slaves
- %% just by declaring a new queue. But add can't be synchronous all
- %% the time as it can be called by slaves and that's
- %% deadlock-prone.
- rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
- #state { name = QName,
- gm = GM,
- coordinator = CPid,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = dict:new(),
- confirmed = [],
- known_senders = sets:new(),
- wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) }.
+ case rabbit_mirror_queue_coordinator:start_link(
+ Q, undefined, sender_death_fun(), depth_fun()) of
+ {ok, CPid} ->
+ GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Self = self(),
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ [Q1 = #amqqueue{gm_pids = GMPids}]
+ = mnesia:read({rabbit_queue, QName}),
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
+ state = live})
+ end),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ %% We need synchronous add here (i.e. do not return until the
+ %% slave is running) so that when queue declaration is finished
+ %% all slaves are up; we don't want to end up with unsynced slaves
+ %% just by declaring a new queue. But add can't be synchronous all
+ %% the time as it can be called by slaves and that's
+ %% deadlock-prone.
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
+ #state { name = QName,
+ gm = GM,
+ coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = dict:new(),
+ confirmed = [],
+ known_senders = sets:new(),
+ wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
+ {error, Reason} ->
+ %% The GM can shutdown before the coordinator has started up
+ %% (lost membership or missing group), thus the start_link of
+ %% the coordinator returns {error, shutdown} as rabbit_amqqueue_process
+ % is trapping exists
+ throw({coordinator_not_started, Reason})
+ end.
stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 83350920e6..375a0366dd 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -20,7 +20,7 @@
-export([remove_from_queue/3, on_node_up/0, add_mirrors/3,
report_deaths/4, store_updated_slaves/1,
initial_queue_node/2, suggested_queue_nodes/1,
- is_mirrored/1, update_mirrors/2, validate_policy/1,
+ is_mirrored/1, update_mirrors/2, update_mirrors/1, validate_policy/1,
maybe_auto_sync/1, maybe_drop_master_after_sync/1,
sync_batch_size/1, log_info/3, log_warning/3]).
@@ -76,7 +76,7 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
- %% get here.
+ %% get here. Or, gm group could've altered. see rabbitmq-server#914
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
@@ -90,7 +90,16 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
AlivePids = [Pid || {_GM, Pid} <- AliveGM],
Alive = [Pid || Pid <- [QPid | SPids],
lists:member(Pid, AlivePids)],
- {QPid1, SPids1} = promote_slave(Alive),
+ {QPid1, SPids1} = case Alive of
+ [] ->
+ %% GM altered, & if all pids are
+ %% perceived as dead, rather do
+ %% do nothing here, & trust the
+ %% promoted slave to have updated
+ %% mnesia during the alteration.
+ {QPid, SPids};
+ _ -> promote_slave(Alive)
+ end,
Extra =
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
@@ -98,7 +107,8 @@ remove_from_queue(QueueName, Self, DeadGMPids) ->
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
- %% become the master.
+ %% become the master. If gm altered,
+ %% we have no choice but to proceed.
Q1 = Q#amqqueue{pid = QPid1,
slave_pids = SPids1,
gm_pids = AliveGM},
@@ -392,15 +402,12 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
{false, false} -> ok;
- {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
- {false, true} -> rabbit_amqqueue:start_mirroring(QPid);
- {true, true} -> update_mirrors0(OldQ, NewQ)
+ _ -> rabbit_amqqueue:update_mirroring(QPid)
end.
-update_mirrors0(OldQ = #amqqueue{name = QName},
- NewQ = #amqqueue{name = QName}) ->
- {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
- {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+update_mirrors(Q = #amqqueue{name = QName}) ->
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(Q),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(Q),
OldNodes = [OldMNode | OldSNodes],
NewNodes = [NewMNode | NewSNodes],
%% When a mirror dies, remove_from_queue/2 might have to add new
@@ -414,7 +421,7 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
drop_mirrors(QName, OldNodes -- NewNodes),
%% This is for the case where no extra nodes were added but we changed to
%% a policy requiring auto-sync.
- maybe_auto_sync(NewQ),
+ maybe_auto_sync(Q),
ok.
%% The arrival of a newly synced slave may cause the master to die if
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6f46cdc698..4770018f9e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -225,9 +225,15 @@ handle_call({gm_deaths, DeadGMPids}, From,
_ ->
%% master has changed to not us
gen_server2:reply(From, ok),
- %% assertion, we don't need to add_mirrors/2 in this
- %% branch, see last clause in remove_from_queue/2
- [] = ExtraNodes,
+ %% see rabbitmq-server#914;
+ %% It's not always guaranteed that we won't have ExtraNodes.
+ %% If gm alters, master can change to not us with extra nodes,
+ %% in which case we attempt to add mirrors on those nodes.
+ case ExtraNodes of
+ [] -> void;
+ _ -> rabbit_mirror_queue_misc:add_mirrors(
+ QName, ExtraNodes, async)
+ end,
%% Since GM is by nature lazy we need to make sure
%% there is some traffic when a master dies, to
%% make sure all slaves get informed of the
@@ -250,8 +256,21 @@ handle_cast(go, {not_started, Q} = NotStarted) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({gm, Instruction}, State) ->
- handle_process_result(process_instruction(Instruction, State));
+handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{slave_pids = SPids}} ->
+ case lists:member(self(), SPids) of
+ true ->
+ handle_process_result(process_instruction(Instruction, State));
+ false ->
+ %% Potentially a duplicated slave caused by a partial partition,
+ %% will stop as a new slave could start unaware of our presence
+ {stop, shutdown, State}
+ end;
+ {error, not_found} ->
+ %% Would not expect this to happen after fixing #953
+ {stop, shutdown, State}
+ end;
handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 54f0855fce..c438e91a3f 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -248,9 +248,15 @@ syncer_loop(Ref, MPid, SPids) ->
syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
- broadcast(SPids1, {sync_msgs, Ref, Msgs}),
- MPid ! {next, Ref},
- syncer_loop(Ref, MPid, SPids1);
+ case SPids1 of
+ [] ->
+ % Die silently because there are no slaves left.
+ ok;
+ _ ->
+ broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end;
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
%% they interpret that as a failure, which is what we
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 26a864f0f5..596eb62b03 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -58,7 +58,7 @@
%% Main interface
-spec init() -> 'ok'.
-spec join_cluster(node(), node_type())
- -> 'ok' | {'ok', 'already_member'}.
+ -> ok | {ok, already_member} | {error, {inconsistent_cluster, string()}}.
-spec reset() -> 'ok'.
-spec force_reset() -> 'ok'.
-spec update_cluster_nodes(node()) -> 'ok'.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0322aacfd1..bea2a3fa96 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -336,7 +336,17 @@ init([]) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
- {ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
+ %% If the node has been restarted, Mnesia can trigger a system notification
+ %% before the monitor subscribes to receive them. To avoid autoheal blocking due to
+ %% the inconsistent database event never arriving, we being monitoring all running
+ %% nodes as early as possible. The rest of the monitoring ops will only be triggered
+ %% when notifications arrive.
+ Nodes = possibly_partitioned_nodes(),
+ startup_log(Nodes),
+ Monitors = lists:foldl(fun(Node, Monitors0) ->
+ pmon:monitor({rabbit, Node}, Monitors0)
+ end, pmon:new(), Nodes),
+ {ok, ensure_keepalive_timer(#state{monitors = Monitors,
subscribers = pmon:new(),
partitions = [],
guid = rabbit_guid:gen(),
@@ -486,20 +496,22 @@ handle_cast({partial_partition_disconnect, Other}, State) ->
%% mnesia propagation.
handle_cast({node_up, Node, NodeType},
State = #state{monitors = Monitors}) ->
- case pmon:is_monitored({rabbit, Node}, Monitors) of
- true -> {noreply, State};
- false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
- {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
- write_cluster_status({add_node(Node, AllNodes),
- case NodeType of
- disc -> add_node(Node, DiscNodes);
- ram -> DiscNodes
- end,
- add_node(Node, RunningNodes)}),
- ok = handle_live_rabbit(Node),
- Monitors1 = pmon:monitor({rabbit, Node}, Monitors),
- {noreply, maybe_autoheal(State#state{monitors = Monitors1})}
- end;
+ rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({add_node(Node, AllNodes),
+ case NodeType of
+ disc -> add_node(Node, DiscNodes);
+ ram -> DiscNodes
+ end,
+ add_node(Node, RunningNodes)}),
+ ok = handle_live_rabbit(Node),
+ Monitors1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
+ true ->
+ Monitors;
+ false ->
+ pmon:monitor({rabbit, Node}, Monitors)
+ end,
+ {noreply, maybe_autoheal(State#state{monitors = Monitors1})};
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
@@ -572,7 +584,7 @@ handle_info({mnesia_system_event,
State1 = case pmon:is_monitored({rabbit, Node}, Monitors) of
true -> State;
false -> State#state{
- monitors = pmon:monitor({rabbit, Node}, Monitors)}
+ monitors = pmon:monitor({rabbit, Node}, Monitors)}
end,
ok = handle_live_rabbit(Node),
Partitions1 = lists:usort([Node | Partitions]),
@@ -873,3 +885,12 @@ alive_rabbit_nodes(Nodes) ->
ping_all() ->
[net_adm:ping(N) || N <- rabbit_mnesia:cluster_nodes(all)],
ok.
+
+possibly_partitioned_nodes() ->
+ alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running).
+
+startup_log([]) ->
+ rabbit_log:info("Starting rabbit_node_monitor~n", []);
+startup_log(Nodes) ->
+ rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~p~n",
+ [Nodes]).
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index eb8cf63327..a9caadf972 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -276,7 +276,9 @@ update_queue(Q = #amqqueue{name = QName, policy = OldPolicy}, Policies) ->
NewPolicy -> case rabbit_amqqueue:update(
QName, fun(Q1) ->
rabbit_queue_decorator:set(
- Q1#amqqueue{policy = NewPolicy})
+ Q1#amqqueue{policy = NewPolicy,
+ policy_version =
+ Q1#amqqueue.policy_version + 1 })
end) of
#amqqueue{} = Q1 -> {Q, Q1};
not_found -> {Q, Q }
diff --git a/src/rabbit_queue_location_validator.erl b/src/rabbit_queue_location_validator.erl
index 44394a962c..c5aad50e64 100644
--- a/src/rabbit_queue_location_validator.erl
+++ b/src/rabbit_queue_location_validator.erl
@@ -26,7 +26,9 @@
{mfa, {rabbit_registry, register,
[policy_validator,
<<"queue-master-locator">>,
- ?MODULE]}}]}).
+ ?MODULE]}},
+ {requires, rabbit_registry},
+ {enables, recovery}]}).
validate_policy(KeyList) ->
case proplists:lookup(<<"queue-master-locator">> , KeyList) of
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index a49f7a5893..1b647f9c05 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -52,6 +52,7 @@
-rabbit_upgrade({down_slave_nodes, mnesia, [queue_decorators]}).
-rabbit_upgrade({queue_state, mnesia, [down_slave_nodes]}).
-rabbit_upgrade({recoverable_slaves, mnesia, [queue_state]}).
+-rabbit_upgrade({policy_version, mnesia, [recoverable_slaves]}).
-rabbit_upgrade({user_password_hashing, mnesia, [hash_passwords]}).
%% -------------------------------------------------------------------
@@ -443,6 +444,24 @@ recoverable_slaves(Table) ->
sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators,
state]).
+policy_version() ->
+ ok = policy_version(rabbit_queue),
+ ok = policy_version(rabbit_durable_queue).
+
+policy_version(Table) ->
+ transform(
+ Table,
+ fun ({amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State}) ->
+ {amqqueue, Name, Durable, AutoDelete, ExclusiveOwner, Arguments,
+ Pid, SlavePids, SyncSlavePids, DSN, Policy, GmPids, Decorators,
+ State, 0}
+ end,
+ [name, durable, auto_delete, exclusive_owner, arguments, pid, slave_pids,
+ sync_slave_pids, recoverable_slaves, policy, gm_pids, decorators, state,
+ policy_version]).
+
%% Prior to 3.6.0, passwords were hashed using MD5, this populates
%% existing records with said default. Users created with 3.6.0+ will
%% have internal_user.hashing_algorithm populated by the internal
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 9ad752a174..297df086ad 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -939,7 +939,7 @@ convert_to_lazy(State) ->
%% is not in a proper state for a lazy BQ (unless all
%% messages have been paged to disk already).
wait_for_msg_store_credit(),
- convert_to_lazy(State1)
+ convert_to_lazy(resume(State1))
end.
wait_for_msg_store_credit() ->