diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-09-21 17:24:53 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-09-21 17:24:53 +0300 |
| commit | ddecc0416338a881d24beed556de5e774081195d (patch) | |
| tree | c957845e0aa8192410b3bcd7408e85057cfca15a | |
| parent | be9a3f7f84d3f6c680d3019360c5458f946aa462 (diff) | |
| parent | 0e7e75d3e3a85965fa1848073ab82d511c4c8da5 (diff) | |
| download | rabbitmq-server-git-ddecc0416338a881d24beed556de5e774081195d.tar.gz | |
Merge branch 'rabbitmq-server-944' into stable
| -rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 17 |
2 files changed, 24 insertions, 5 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index fe1168f120..562f0f0fcf 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -368,9 +368,15 @@ handle_cast({gm_deaths, DeadGMPids}, {stop, normal, State} end; -handle_cast(request_depth, State = #state { depth_fun = DepthFun }) -> - ok = DepthFun(), - noreply(State); +handle_cast(request_depth, State = #state { depth_fun = DepthFun, + q = #amqqueue { name = QName, pid = MPid }}) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{ pid = MPid }} -> + ok = DepthFun(), + noreply(State); + _ -> + {stop, shutdown, State} + end; handle_cast({ensure_monitoring, Pids}, State = #state { monitors = Mons }) -> noreply(State #state { monitors = pmon:monitor_all(Pids, Mons) }); diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 2cb4455180..4770018f9e 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -256,8 +256,21 @@ handle_cast(go, {not_started, Q} = NotStarted) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({gm, Instruction}, State) -> - handle_process_result(process_instruction(Instruction, State)); +handle_cast({gm, Instruction}, State = #state{q = #amqqueue { name = QName }}) -> + case rabbit_amqqueue:lookup(QName) of + {ok, #amqqueue{slave_pids = SPids}} -> + case lists:member(self(), SPids) of + true -> + handle_process_result(process_instruction(Instruction, State)); + false -> + %% Potentially a duplicated slave caused by a partial partition, + %% will stop as a new slave could start unaware of our presence + {stop, shutdown, State} + end; + {error, not_found} -> + %% Would not expect this to happen after fixing #953 + {stop, shutdown, State} + end; handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> |
