summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2016-09-21 17:24:53 +0300
committerMichael Klishin <mklishin@pivotal.io>2016-09-21 17:24:53 +0300
commitddecc0416338a881d24beed556de5e774081195d (patch)
treec957845e0aa8192410b3bcd7408e85057cfca15a
parentbe9a3f7f84d3f6c680d3019360c5458f946aa462 (diff)
parent0e7e75d3e3a85965fa1848073ab82d511c4c8da5 (diff)
downloadrabbitmq-server-git-ddecc0416338a881d24beed556de5e774081195d.tar.gz
Merge branch 'rabbitmq-server-944' into stable
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl17
2 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index fe1168f120..562f0f0fcf 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -368,9 +368,15 @@ handle_cast({gm_deaths, DeadGMPids},
{stop, normal, State}
end;
-handle_cast(request_depth, State = #state { depth_fun = DepthFun }) ->
- ok = DepthFun(),
- noreply(State);
+handle_cast(request_depth, State = #state { depth_fun = DepthFun,
+ q = #amqqueue { name = QName, pid = MPid }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{ pid = MPid }} ->
+ ok = DepthFun(),
+ noreply(State);
+ _ ->
+ {stop, shutdown, State}
+ end;
handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) ->
noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) });
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 2cb4455180..4770018f9e 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -256,8 +256,21 @@ handle_cast(go, {not_started, Q} = NotStarted) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({gm, Instruction}, State) ->
- handle_process_result(process_instruction(Instruction, State));
+handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) ->
+ case rabbit_amqqueue:lookup(QName) of
+ {ok, #amqqueue{slave_pids = SPids}} ->
+ case lists:member(self(), SPids) of
+ true ->
+ handle_process_result(process_instruction(Instruction, State));
+ false ->
+ %% Potentially a duplicated slave caused by a partial partition,
+ %% will stop as a new slave could start unaware of our presence
+ {stop, shutdown, State}
+ end;
+ {error, not_found} ->
+ %% Would not expect this to happen after fixing #953
+ {stop, shutdown, State}
+ end;
handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->