summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
2 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index fe1168f120..562f0f0fcf 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -368,9 +368,15 @@ handle_cast({gm_deaths, DeadGMPids},
{stop, normal, State}
end;
-handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
- ok = DepthFun(),
- noreply(State);
+handle_cast(request_depth, State = #state { depth_fun = DepthFun,
+ q = #amqqueue { name = QName, pid = MPid }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{ pid = MPid }} ->
+ ok = DepthFun(),
+ noreply(State);
+ _ ->
+ {stop, shutdown, State}
+ end;
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 12b13c36fd..29ba21d374 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -278,8 +278,21 @@ handle_cast(go, {not_started, Q} = NotStarted) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({gm, Instruction}, State) ->
- handle_process_result(process_instruction(Instruction, State));
+handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{slave_pids = SPids}} ->
+ case lists:member(self(), SPids) of
+ true ->
+ handle_process_result(process_instruction(Instruction, State));
+ false ->
+ %% Potentially a duplicated slave caused by a partial partition,
+ %% will stop as a new slave could start unaware of our presence
+ {stop, shutdown, State}
+ end;
+ {error, not_found} ->
+ %% Would not expect this to happen after fixing #953
+ {stop, shutdown, State}
+ end;
handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->