summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:23:32 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:23:32 +0000
commite6c9d5f2d9547753275c9c6e297db44a5f1725d9 (patch)
tree298150db58ae3ed90d46bf8c3804f511ef64c729 /src
parent140f3de392804f2f069411317c3325dacb59fe3a (diff)
parentb8cc1fcd86c9e293a9a6930b5ec96ce7da03f745 (diff)
downloadrabbitmq-server-git-e6c9d5f2d9547753275c9c6e297db44a5f1725d9.tar.gz
merge default into bug24297
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_consumers.erl18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index ca47b43442..bf3d857d36 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -141,24 +141,24 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
State#state{consumers =
- remove_consumer(ChPid, ConsumerTag, Consumers)}
+ remove_consumer(ChPid, CTag, Consumers)}
end.
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
@@ -217,14 +217,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
end.
deliver_to_consumer(FetchFun,
- #consumer{tag = ConsumerTag,
+ #consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in(AckTag, ChAckTags);
@@ -423,9 +423,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
end,
priority_queue:in({ChPid, Consumer}, Priority, Queue).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
end, Queue).
remove_consumers(ChPid, Queue) ->