summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl22
1 files changed, 13 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 78b0d23d47..829798c7d3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -401,13 +401,16 @@ erase_ch_record(#cr{ch_pid = ChPid,
erase({ch, ChPid}),
ok.
-maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
+maybe_send_drained(true, _State) ->
+ ok;
+
+maybe_send_drained(false, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_empty(BQS) of
true -> send_drained();
false -> ok
end.
-maybe_send_drained(C, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+maybe_send_drained_cons(C, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
case BQ:is_empty(BQS) of
true -> send_drained(C);
false -> ok
@@ -609,14 +612,14 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
- maybe_send_drained(State1),
+ maybe_send_drained(BQ:is_empty(BQS), State1),
run_message_queue(State1).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
- maybe_send_drained(State1),
+ maybe_send_drained(Result =:= empty, State1),
{Result, State1}.
ack(AckTags, ChPid, State) ->
@@ -1168,7 +1171,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
AC1 = queue:in(E, State1#q.active_consumers),
run_message_queue(State1#q{active_consumers = AC1})
end,
- maybe_send_drained(C1, State2),
+ maybe_send_drained_cons(C1, State2),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State2)),
reply(ok, State2)
@@ -1219,7 +1222,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
State1 = State#q{backing_queue_state = BQS1},
- maybe_send_drained(State1),
+ maybe_send_drained(Count =:= 0, State1),
reply({ok, Count}, State1);
handle_call({requeue, AckTags, ChPid}, From, State) ->
@@ -1383,7 +1386,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)),
State1 = possibly_unblock(
State, ChPid, fun(C) -> C#cr{limiter = Lim2} end),
- maybe_send_drained(lookup_ch(ChPid), State1),
+ maybe_send_drained_cons(lookup_ch(ChPid), State1),
noreply(State1);
handle_cast(wake_up, State) ->
@@ -1404,9 +1407,10 @@ handle_info(maybe_expire, State) ->
false -> noreply(ensure_expiry_timer(State))
end;
-handle_info(drop_expired, State) ->
+handle_info(drop_expired, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
- maybe_send_drained(State1),
+ maybe_send_drained(BQ:is_empty(BQS), State1),
noreply(State1);
handle_info(emit_stats, State) ->