summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-09 14:38:48 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-09 14:38:48 +0000
commit90510a5b8fa65650139a75345b55fe86a0259058 (patch)
treee52211064e8b58ba54e82533083fa38427b1b5de /src
parentf807378722d273a0375bbd71e98b566816577cae (diff)
downloadrabbitmq-server-git-90510a5b8fa65650139a75345b55fe86a0259058.tar.gz
Upon unblocking, unblock, and run message queue.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_limiter.erl18
-rw-r--r--src/rabbit_queue_consumers.erl16
3 files changed, 27 insertions, 18 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cc511396c3..b220272c57 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -634,10 +634,13 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, CTag, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of
- not_found -> State;
- ok -> Fun(State)
+subtract_acks(ChPid, CTag, AckTags, State = #q{consumers = Consumers}, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(
+ ChPid, CTag, AckTags, Consumers) of
+ not_found -> State;
+ unchanged -> Fun(State);
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, Fun(State1))
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d0dccd035e..2686579ef0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -290,15 +290,17 @@ set_consumer_prefetch(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
Limiter#qstate{credits = Credits1}.
ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
- Credits1 = case gb_trees:lookup(CTag, Credits) of
- {value, C = #credit{mode = auto,
- credit = Credit0}} ->
- gb_trees:enter(CTag, C#credit{credit = Credit0 + Credit},
- Credits);
- _ ->
- Credits
+ {Credits1, Unblocked} =
+ case gb_trees:lookup(CTag, Credits) of
+ {value, C = #credit{mode = auto,
+ credit = Credit0}} ->
+ {gb_trees:enter(
+ CTag, C#credit{credit = Credit0 + Credit}, Credits),
+ Credit0 =:= 0};
+ _ ->
+ {Credits, false}
end,
- Limiter#qstate{credits = Credits1}.
+ {Limiter#qstate{credits = Credits1}, Unblocked}.
drained(Limiter = #qstate{credits = Credits}) ->
{CTagCredits, Credits2} =
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 9dafa71a32..3ae29d30ad 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -244,16 +244,20 @@ record_ack(ChPid, LimiterPid, AckTag) ->
update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
ok.
-subtract_acks(ChPid, CTag, AckTags) ->
+subtract_acks(ChPid, CTag, AckTags, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{acktags = ChAckTags, limiter = Lim} ->
- Lim2 = rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
+ {Lim2, Unblocked} =
+ rabbit_limiter:ack_from_queue(Lim, CTag, length(AckTags)),
AckTags2 = subtract_acks0(AckTags, [], ChAckTags),
- update_ch_record(C#cr{acktags = AckTags2,
- limiter = Lim2}),
- ok
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
end.
subtract_acks0([], [], AckQ) ->