diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0530d650f0..c0e74129d6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -408,12 +408,6 @@ maybe_send_drained(WasEmpty, State) -> false -> ok end. -maybe_send_drained_cons(C, State) -> - case is_empty(State) of - true -> send_drained(C); - false -> ok - end. - send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of {[], Limiter} -> ok; @@ -1166,7 +1160,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, AC1 = queue:in(E, State1#q.active_consumers), run_message_queue(State1#q{active_consumers = AC1}) end, - maybe_send_drained_cons(C1, State2), + case is_empty(State2) of + true -> send_drained(lookup_ch(ChPid)); + false -> ok + end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State2)), reply(ok, State2) @@ -1376,13 +1373,16 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, handle_cast({credit, ChPid, CTag, Credit, Drain}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - #cr{limiter = Lim} = ch_record(ChPid), - Lim2 = rabbit_limiter:credit(Lim, CTag, Credit, Drain), - rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)), - State1 = possibly_unblock( - State, ChPid, fun(C) -> C#cr{limiter = Lim2} end), - maybe_send_drained_cons(lookup_ch(ChPid), State1), - noreply(State1); + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Lim} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Lim, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> possibly_unblock(State, C1) + end); handle_cast(wake_up, State) -> noreply(State). |
