diff options
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 |
3 files changed, 37 insertions, 28 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 2feeea5a26..71ce512e54 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) -> handle_call(get_gm, _From, State = #state { gm = GM }) -> reply(GM, State). -handle_cast({gm_deaths, LiveGMPids}, +handle_cast({gm_deaths, DeadGMPids}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue( - QueueName, MPid, LiveGMPids) of + QueueName, MPid, DeadGMPids) of {ok, MPid, DeadPids} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), @@ -401,10 +401,10 @@ joined([CPid], Members) -> CPid ! {joined, self(), Members}, ok. -members_changed([_CPid], _Births, [], _Live) -> +members_changed([_CPid], _Births, [], _Live) -> ok; -members_changed([CPid], _Births, _Deaths, Live) -> - ok = gen_server2:cast(CPid, {gm_deaths, Live}). +members_changed([CPid], _Births, Deaths, _Live) -> + ok = gen_server2:cast(CPid, {gm_deaths, Deaths}). handle_msg([CPid], _From, request_depth = Msg) -> ok = gen_server2:cast(CPid, Msg); diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index ff1d5815a1..2838996ce9 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -73,7 +73,7 @@ %% slave (now master) receives messages it's not ready for (for %% example, new consumers). %% Returns {ok, NewMPid, DeadPids} -remove_from_queue(QueueName, Self, LiveGMPids) -> +remove_from_queue(QueueName, Self, DeadGMPids) -> rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -83,16 +83,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> [Q = #amqqueue { pid = QPid, slave_pids = SPids, gm_pids = GMPids }] -> - {GMPids1, Dead} = lists:partition( + {Dead, GMPids1} = lists:partition( fun ({GM, _}) -> - lists:member(GM, LiveGMPids) + lists:member(GM, DeadGMPids) end, GMPids), DeadPids = [Pid || {_GM, Pid} <- Dead], Alive = [QPid | SPids] -- DeadPids, {QPid1, SPids1} = promote_slave(Alive), case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> - GMPids = GMPids1, %% ASSERTION {ok, QPid1, []}; _ when QPid =:= QPid1 orelse QPid1 =:= Self -> %% Either master hasn't changed, so @@ -107,12 +106,23 @@ remove_from_queue(QueueName, Self, LiveGMPids) -> %% then shut it down. So let's check if the new %% master needs to sync. maybe_auto_sync(Q1), - {ok, QPid1, [QPid | SPids] -- Alive}; + {ok, QPid1, DeadPids}; _ -> - %% Master has changed, and we're not it, - %% so leave alone to allow the promoted - %% slave to find it and make its - %% promotion atomic. + %% Master has changed, and we're not it. + %% We still update mnesia here in case + %% the slave that is supposed to become + %% master dies before it does do so, in + %% which case the dead old master might + %% otherwise never get removed, which in + %% turn might prevent promotion of + %% another slave (e.g. us). + %% + %% Note however that we do not update + %% the master pid, for reasons explained + %% at the top of the function. + Q1 = Q#amqqueue{slave_pids = SPids1, + gm_pids = GMPids1}, + store_updated_slaves(Q1), {ok, QPid1, []} end end diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index e5c3adac06..fbdcb979d2 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({gm_deaths, LiveGMPids}, From, +handle_call({gm_deaths, DeadGMPids}, From, State = #state { gm = GM, q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), - case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of + case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; @@ -365,7 +365,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; - {gm_deaths, _Live} -> 5; + {gm_deaths, _Dead} -> 5; _ -> 0 end. @@ -393,10 +393,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). joined([SPid], _Members) -> SPid ! {joined, self()}, ok. -members_changed([_SPid], _Births, [], _Live) -> +members_changed([_SPid], _Births, [], _Live) -> ok; -members_changed([ SPid], _Births, _Deaths, Live) -> - inform_deaths(SPid, Live). +members_changed([ SPid], _Births, Deaths, _Live) -> + case rabbit_misc:with_exit_handler( + rabbit_misc:const(ok), + fun() -> + gen_server2:call(SPid, {gm_deaths, Deaths}, infinity) + end) of + ok -> ok; + {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} + end. handle_msg([_SPid], _From, request_depth) -> %% This is only of value to the master @@ -421,14 +428,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) -> handle_msg([SPid], _From, Msg) -> ok = gen_server2:cast(SPid, {gm, Msg}). -inform_deaths(SPid, Live) -> - case rabbit_misc:with_exit_handler( - rabbit_misc:const(ok), - fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of - ok -> ok; - {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]} - end. - %% --------------------------------------------------------------------------- %% Others %% --------------------------------------------------------------------------- |
