diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 14:38:48 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-09 14:38:48 +0000 |
| commit | 90510a5b8fa65650139a75345b55fe86a0259058 (patch) | |
| tree | e52211064e8b58ba54e82533083fa38427b1b5de /src | |
| parent | f807378722d273a0375bbd71e98b566816577cae (diff) | |
| download | rabbitmq-server-git-90510a5b8fa65650139a75345b55fe86a0259058.tar.gz | |
Upon unblocking, unblock, and run message queue.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 16 |
3 files changed, 27 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cc511396c3..b220272c57 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -634,10 +634,13 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, CTag, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of - not_found -> State; - ok -> Fun(State) +subtract_acks(ChPid, CTag, AckTags, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks( + ChPid, CTag, AckTags, Consumers) of + not_found -> State; + unchanged -> Fun(State); + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, Fun(State1)) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d0dccd035e..2686579ef0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -290,15 +290,17 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) -> Limiter#qstate{credits = Credits1}. ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> - Credits1 = case gb_trees:lookup(CTag, Credits) of - {value, C = #credit{mode = auto, - credit = Credit0}} -> - gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit}, - Credits); - _ -> - Credits + {Credits1, Unblocked} = + case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, + credit = Credit0}} -> + {gb_trees:enter( + CTag, C#credit{credit = Credit0 + Credit}, Credits), + Credit0 =:= 0}; + _ -> + {Credits, false} end, - Limiter#qstate{credits = Credits1}. + {Limiter#qstate{credits = Credits1}, Unblocked}. drained(Limiter = #qstate{credits = Credits}) -> {CTagCredits, Credits2} = diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 9dafa71a32..3ae29d30ad 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/3, + send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -244,16 +244,20 @@ record_ack(ChPid, LimiterPid, AckTag) -> update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), ok. -subtract_acks(ChPid, CTag, AckTags) -> +subtract_acks(ChPid, CTag, AckTags, State) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> - Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), + {Lim2, Unblocked} = + rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)), AckTags2 = subtract_acks0(AckTags, [], ChAckTags), - update_ch_record(C#cr{acktags = AckTags2, - limiter = Lim2}), - ok + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. subtract_acks0([], [], AckQ) -> |
