summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl10
-rw-r--r--src/rabbit_mirror_queue_misc.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
3 files changed, 37 insertions, 28 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2feeea5a26..71ce512e54 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, LiveGMPids},
+handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, LiveGMPids) of
+ QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -401,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, [], _Live) ->
+members_changed([_CPid], _Births, [], _Live) ->
ok;
-members_changed([CPid], _Births, _Deaths, Live) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Live}).
+members_changed([CPid], _Births, Deaths, _Live) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ff1d5815a1..2838996ce9 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -73,7 +73,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, LiveGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -83,16 +83,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {GMPids1, Dead} = lists:partition(
+ {Dead, GMPids1} = lists:partition(
fun ({GM, _}) ->
- lists:member(GM, LiveGMPids)
+ lists:member(GM, DeadGMPids)
end, GMPids),
DeadPids = [Pid || {_GM, Pid} <- Dead],
Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
@@ -107,12 +106,23 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
%% then shut it down. So let's check if the new
%% master needs to sync.
maybe_auto_sync(Q1),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ {ok, QPid1, DeadPids};
_ ->
- %% Master has changed, and we're not it,
- %% so leave alone to allow the promoted
- %% slave to find it and make its
- %% promotion atomic.
+ %% Master has changed, and we're not it.
+ %% We still update mnesia here in case
+ %% the slave that is supposed to become
+ %% master dies before it does do so, in
+ %% which case the dead old master might
+ %% otherwise never get removed, which in
+ %% turn might prevent promotion of
+ %% another slave (e.g. us).
+ %%
+ %% Note however that we do not update
+ %% the master pid, for reasons explained
+ %% at the top of the function.
+ Q1 = Q#amqqueue{slave_pids = SPids1,
+ gm_pids = GMPids1},
+ store_updated_slaves(Q1),
{ok, QPid1, []}
end
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e5c3adac06..fbdcb979d2 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({gm_deaths, LiveGMPids}, From,
+handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -365,7 +365,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Live} -> 5;
+ {gm_deaths, _Dead} -> 5;
_ -> 0
end.
@@ -393,10 +393,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, [], _Live) ->
+members_changed([_SPid], _Births, [], _Live) ->
ok;
-members_changed([ SPid], _Births, _Deaths, Live) ->
- inform_deaths(SPid, Live).
+members_changed([ SPid], _Births, Deaths, _Live) ->
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() ->
+ gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
+ end) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -421,14 +428,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Live) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
- ok -> ok;
- {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
- end.
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------