summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-12 16:23:09 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-12 16:23:09 +0000
commitf5885abb20e2409ab64d392e0846cb9b7c7509f4 (patch)
treed4e4b10c511fda04c28dd0a0f356c334b5119595
parentcdff3eca6098564c9100e0b156847bef283867bd (diff)
downloadrabbitmq-server-git-f5885abb20e2409ab64d392e0846cb9b7c7509f4.tar.gz
Flow control: channel -> queue slave
-rw-r--r--src/rabbit_mirror_queue_slave.erl8
1 files changed, 5 insertions, 3 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d68063db80..9f6773b2a1 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -208,8 +208,9 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ rabbit_flow:ack(Sender),
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -447,7 +448,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -601,7 +602,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> rabbit_flow:sender_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.