summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl18
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d932d1b670..35f1949321 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -401,13 +401,15 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-maybe_send_drained(#q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- case BQ:len(BQS) of
- 0 -> [maybe_send_drained(C) || C <- all_ch_record()];
- _ -> ok
- end;
-maybe_send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
+maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case BQ:is_empty(BQS) of
+ true -> send_drained();
+ false -> ok
+ end.
+
+send_drained() -> [send_drained(C) || C <- all_ch_record()].
+
+send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
case rabbit_limiter:drained(Limiter) of
{[], Limiter} ->
ok;
@@ -435,6 +437,7 @@ is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
+ send_drained(),
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
@@ -491,7 +494,6 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter,
update_ch_record(C#cr{acktags = ChAckTags1,
limiter = NewLimiter,
unsent_message_count = Count + 1}),
- maybe_send_drained(State1),
{Stop, State1}.
deliver_from_queue_deliver(AckRequired, State) ->