diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 16:23:09 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2012-01-12 16:23:09 +0000 |
| commit | f5885abb20e2409ab64d392e0846cb9b7c7509f4 (patch) | |
| tree | d4e4b10c511fda04c28dd0a0f356c334b5119595 | |
| parent | cdff3eca6098564c9100e0b156847bef283867bd (diff) | |
| download | rabbitmq-server-git-f5885abb20e2409ab64d392e0846cb9b7c7509f4.tar.gz | |
Flow control: channel -> queue slave
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 8 |
1 files changed, 5 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d68063db80..9f6773b2a1 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -208,8 +208,9 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery {}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + rabbit_flow:ack(Sender), noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -447,7 +448,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MonitoringPids = [begin true = erlang:demonitor(MRef), + MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid end || {Pid, MRef} <- dict:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( @@ -601,7 +602,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> confirm_sender_death(ChPid) + true -> rabbit_flow:sender_down(ChPid), + confirm_sender_death(ChPid) end, State. |
