summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 07:42:52 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 07:42:52 +0100
commit0736e034a0cdae785aa3268db588d9586deaf93d (patch)
treea91c1d503e516bbdd99b9496355b4626127e730e
parentd322055e28be2bfbee71e9400d2aa55752795cf3 (diff)
downloadrabbitmq-server-git-0736e034a0cdae785aa3268db588d9586deaf93d.tar.gz
drive remove_from_queue with DeadGMPids
...instead of LiveGMPids The latter may be out of date s.t. it contains fewer pids than are actually alive, due to new GMs having sprung into live recently. We'd then, incorrectly, remove the corresponding entries from the queue's mnesia record, causing havoc. DeadGMPids can be out of date too; it may contain fewer pids than have actually died, due to GMs having died more recently. That is harmless though since it never leads us to remove an entry that we shouldn't, and we will learn about any new deaths eventually.
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl10
-rw-r--r--src/rabbit_mirror_queue_misc.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
3 files changed, 37 insertions, 28 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 2feeea5a26..71ce512e54 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -348,11 +348,11 @@ init([#amqqueue { name = QueueName } = Q, GM, DeathFun, DepthFun]) ->
handle_call(get_gm, _From, State = #state { gm = GM }) ->
reply(GM, State).
-handle_cast({gm_deaths, LiveGMPids},
+handle_cast({gm_deaths, DeadGMPids},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(
- QueueName, MPid, LiveGMPids) of
+ QueueName, MPid, DeadGMPids) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
@@ -401,10 +401,10 @@ joined([CPid], Members) ->
CPid ! {joined, self(), Members},
ok.
-members_changed([_CPid], _Births, [], _Live) ->
+members_changed([_CPid], _Births, [], _Live) ->
ok;
-members_changed([CPid], _Births, _Deaths, Live) ->
- ok = gen_server2:cast(CPid, {gm_deaths, Live}).
+members_changed([CPid], _Births, Deaths, _Live) ->
+ ok = gen_server2:cast(CPid, {gm_deaths, Deaths}).
handle_msg([CPid], _From, request_depth = Msg) ->
ok = gen_server2:cast(CPid, Msg);
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index ff1d5815a1..2838996ce9 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -73,7 +73,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, Self, LiveGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -83,16 +83,15 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
[Q = #amqqueue { pid = QPid,
slave_pids = SPids,
gm_pids = GMPids }] ->
- {GMPids1, Dead} = lists:partition(
+ {Dead, GMPids1} = lists:partition(
fun ({GM, _}) ->
- lists:member(GM, LiveGMPids)
+ lists:member(GM, DeadGMPids)
end, GMPids),
DeadPids = [Pid || {_GM, Pid} <- Dead],
Alive = [QPid | SPids] -- DeadPids,
{QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
_ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
@@ -107,12 +106,23 @@ remove_from_queue(QueueName, Self, LiveGMPids) ->
%% then shut it down. So let's check if the new
%% master needs to sync.
maybe_auto_sync(Q1),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ {ok, QPid1, DeadPids};
_ ->
- %% Master has changed, and we're not it,
- %% so leave alone to allow the promoted
- %% slave to find it and make its
- %% promotion atomic.
+ %% Master has changed, and we're not it.
+ %% We still update mnesia here in case
+ %% the slave that is supposed to become
+ %% master dies before it does do so, in
+ %% which case the dead old master might
+ %% otherwise never get removed, which in
+ %% turn might prevent promotion of
+ %% another slave (e.g. us).
+ %%
+ %% Note however that we do not update
+ %% the master pid, for reasons explained
+ %% at the top of the function.
+ Q1 = Q#amqqueue{slave_pids = SPids1,
+ gm_pids = GMPids1},
+ store_updated_slaves(Q1),
{ok, QPid1, []}
end
end
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index e5c3adac06..fbdcb979d2 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -191,11 +191,11 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({gm_deaths, LiveGMPids}, From,
+handle_call({gm_deaths, DeadGMPids}, From,
State = #state { gm = GM, q = Q = #amqqueue {
name = QName, pid = MPid }}) ->
Self = self(),
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, LiveGMPids) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, DeadGMPids) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
@@ -365,7 +365,7 @@ handle_pre_hibernate(State = #state { backing_queue = BQ,
prioritise_call(Msg, _From, _Len, _State) ->
case Msg of
info -> 9;
- {gm_deaths, _Live} -> 5;
+ {gm_deaths, _Dead} -> 5;
_ -> 0
end.
@@ -393,10 +393,17 @@ format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).
joined([SPid], _Members) -> SPid ! {joined, self()}, ok.
-members_changed([_SPid], _Births, [], _Live) ->
+members_changed([_SPid], _Births, [], _Live) ->
ok;
-members_changed([ SPid], _Births, _Deaths, Live) ->
- inform_deaths(SPid, Live).
+members_changed([ SPid], _Births, Deaths, _Live) ->
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const(ok),
+ fun() ->
+ gen_server2:call(SPid, {gm_deaths, Deaths}, infinity)
+ end) of
+ ok -> ok;
+ {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
+ end.
handle_msg([_SPid], _From, request_depth) ->
%% This is only of value to the master
@@ -421,14 +428,6 @@ handle_msg([SPid], _From, {sync_start, Ref, Syncer, SPids}) ->
handle_msg([SPid], _From, Msg) ->
ok = gen_server2:cast(SPid, {gm, Msg}).
-inform_deaths(SPid, Live) ->
- case rabbit_misc:with_exit_handler(
- rabbit_misc:const(ok),
- fun() -> gen_server2:call(SPid, {gm_deaths, Live}, infinity) end) of
- ok -> ok;
- {promote, CPid} -> {become, rabbit_mirror_queue_coordinator, [CPid]}
- end.
-
%% ---------------------------------------------------------------------------
%% Others
%% ---------------------------------------------------------------------------