diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-12 16:50:25 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-02-12 16:50:25 +0000 |
| commit | 16c725b4bff4b00193eeb843bd5da328b6430f7e (patch) | |
| tree | 4e90b05fe86141f53a8de63e84a3601906c826eb /src | |
| parent | c0d67a8548145383d3f62076e855ba97209d916f (diff) | |
| download | rabbitmq-server-git-16c725b4bff4b00193eeb843bd5da328b6430f7e.tar.gz | |
simplify & optimise maybe_send_drained
- use BQ:is_empty instead of BQ:len
- make use of Stop flag
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d932d1b670..35f1949321 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -401,13 +401,15 @@ erase_ch_record(#cr{ch_pid = ChPid, erase({ch, ChPid}), ok. -maybe_send_drained(#q{backing_queue = BQ, - backing_queue_state = BQS}) -> - case BQ:len(BQS) of - 0 -> [maybe_send_drained(C) || C <- all_ch_record()]; - _ -> ok - end; -maybe_send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> +maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) -> + case BQ:is_empty(BQS) of + true -> send_drained(); + false -> ok + end. + +send_drained() -> [send_drained(C) || C <- all_ch_record()]. + +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> case rabbit_limiter:drained(Limiter) of {[], Limiter} -> ok; @@ -435,6 +437,7 @@ is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. deliver_msgs_to_consumers(_DeliverFun, true, State) -> + send_drained(), {true, State}; deliver_msgs_to_consumers(DeliverFun, false, State = #q{active_consumers = ActiveConsumers}) -> @@ -491,7 +494,6 @@ deliver_msg_to_consumer(DeliverFun, NewLimiter, update_ch_record(C#cr{acktags = ChAckTags1, limiter = NewLimiter, unsent_message_count = Count + 1}), - maybe_send_drained(State1), {Stop, State1}. deliver_from_queue_deliver(AckRequired, State) -> |
