diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 78b0d23d47..829798c7d3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -401,13 +401,16 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) -> +maybe_send_drained(true, _State) -> + ok; + +maybe_send_drained(false, #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_empty(BQS) of true -> send_drained(); false -> ok end. -maybe_send_drained(C, #q{backing_queue = BQ, backing_queue_state = BQS}) -> +maybe_send_drained_cons(C, #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_empty(BQS) of true -> send_drained(C); false -> ok @@ -609,14 +612,14 @@ requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), - maybe_send_drained(State1), + maybe_send_drained(BQ:is_empty(BQS), State1), run_message_queue(State1). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), - maybe_send_drained(State1), + maybe_send_drained(Result =:= empty, State1), {Result, State1}. ack(AckTags, ChPid, State) -> @@ -1168,7 +1171,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, AC1 = queue:in(E, State1#q.active_consumers), run_message_queue(State1#q{active_consumers = AC1}) end, - maybe_send_drained(C1, State2), + maybe_send_drained_cons(C1, State2), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State2)), reply(ok, State2) @@ -1219,7 +1222,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), State1 = State#q{backing_queue_state = BQS1}, - maybe_send_drained(State1), + maybe_send_drained(Count =:= 0, State1), reply({ok, Count}, State1); handle_call({requeue, AckTags, ChPid}, From, State) -> @@ -1383,7 +1386,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), State1 = possibly_unblock( State, ChPid, fun(C) -> C#cr{limiter = Lim2} end), - maybe_send_drained(lookup_ch(ChPid), State1), + maybe_send_drained_cons(lookup_ch(ChPid), State1), noreply(State1); handle_cast(wake_up, State) -> @@ -1404,9 +1407,10 @@ handle_info(maybe_expire, State) -> false -> noreply(ensure_expiry_timer(State)) end; -handle_info(drop_expired, State) -> +handle_info(drop_expired, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), - maybe_send_drained(State1), + maybe_send_drained(BQ:is_empty(BQS), State1), noreply(State1); handle_info(emit_stats, State) -> |
