diff options
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 17 |
2 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index fe1168f120..562f0f0fcf 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -368,9 +368,15 @@ handle_cast({gm_deaths, DeadGMPids}, {stop, normal, State} end; -handle_cast(request_depth, State = #state { depth_fun = DepthFun }) -> - ok = DepthFun(), - noreply(State); +handle_cast(request_depth, State = #state { depth_fun = DepthFun, + q = #amqqueue { name = QName, pid = MPid }}) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{ pid = MPid }} -> + ok = DepthFun(), + noreply(State); + _ -> + {stop, shutdown, State} + end; handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 12b13c36fd..29ba21d374 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -278,8 +278,21 @@ handle_cast(go, {not_started, Q} = NotStarted) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({gm, Instruction}, State) -> - handle_process_result(process_instruction(Instruction, State)); +handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{slave_pids = SPids}} -> + case lists:member(self(), SPids) of + true -> + handle_process_result(process_instruction(Instruction, State)); + false -> + %% Potentially a duplicated slave caused by a partial partition, + %% will stop as a new slave could start unaware of our presence + {stop, shutdown, State} + end; + {error, not_found} -> + %% Would not expect this to happen after fixing #953 + {stop, shutdown, State} + end; handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> |
