summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl14
1 files changed, 5 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 829798c7d3..5fd3377ad5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -403,10 +403,9 @@ erase_ch_record(#cr{ch_pid = ChPid,
maybe_send_drained(true, _State) ->
ok;
-
maybe_send_drained(false, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_empty(BQS) of
- true -> send_drained();
+ true -> [send_drained(C) || C <- all_ch_record()];
false -> ok
end.
@@ -416,15 +415,12 @@ maybe_send_drained_cons(C, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
false -> ok
end.
-send_drained() -> [send_drained(C) || C <- all_ch_record()].
-
send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
- {[], Limiter} ->
- ok;
- {CTagCredit, Limiter2} ->
- rabbit_channel:send_drained(ChPid, CTagCredit),
- update_ch_record(C#cr{limiter = Limiter2})
+ {[], Limiter} -> ok;
+ {CTagCredit, Limiter2} -> rabbit_channel:send_drained(
+ ChPid, CTagCredit),
+ update_ch_record(C#cr{limiter = Limiter2})
end.
update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) ->