summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
3 files changed, 8 insertions, 9 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 6cd71fc314..16690693f1 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -349,7 +349,7 @@ handle_call(get_gm, _From, State = #state { gm = GM }) ->
handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
- case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
+ case rabbit_mirror_queue_misc:remove_from_queue(QueueName, MPid, Deaths) of
{ok, MPid, DeadPids} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 901f33b152..8a216494cc 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,7 +16,7 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
+-export([remove_from_queue/3, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2]).
@@ -29,8 +29,8 @@
-ifdef(use_specs).
--spec(remove_from_queue/2 ::
- (rabbit_amqqueue:name(), [pid()])
+-spec(remove_from_queue/3 ::
+ (rabbit_amqqueue:name(), pid(), [pid()])
-> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
@@ -57,8 +57,7 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-
-remove_from_queue(QueueName, DeadGMPids) ->
+remove_from_queue(QueueName, Self, DeadGMPids) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -79,7 +78,7 @@ remove_from_queue(QueueName, DeadGMPids) ->
{Same, Same} ->
GMPids = GMPids1, %% ASSERTION
{ok, QPid1, []};
- _ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
+ _ when QPid =:= QPid1 orelse QPid1 =:= Self ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index afb8573896..3d8bd8b408 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -178,12 +178,12 @@ handle_call({deliver, Delivery, true}, From, State) ->
handle_call({gm_deaths, Deaths}, From,
State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
- case rabbit_mirror_queue_misc:remove_from_queue(QName, Deaths) of
+ Self = self(),
+ case rabbit_mirror_queue_misc:remove_from_queue(QName, Self, Deaths) of
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
{ok, Pid, DeadPids} ->
- Self = self(),
rabbit_mirror_queue_misc:report_deaths(Self, false, QName,
DeadPids),
case Pid of