summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_consumers.erl24
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 9f2f4f6a0d..bea7e0d00d 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -118,7 +118,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
@@ -130,31 +130,31 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
update_ch_record(case parse_credit_args(Args) of
none -> C1;
{Crd, Drain} -> credit_and_drain(
- C1, ConsumerTag, Crd, Drain, IsEmpty)
+ C1, CTag, Crd, Drain, IsEmpty)
end),
- Consumer = #consumer{tag = ConsumerTag,
+ Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
-remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
+remove(ChPid, CTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
C = #cr{consumer_count = Count,
limiter = Limiter,
blocked_consumers = Blocked} ->
- Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ Blocked1 = remove_consumer(ChPid, CTag, Blocked),
Limiter1 = case Count of
1 -> rabbit_limiter:deactivate(Limiter);
_ -> Limiter
end,
- Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag),
+ Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag),
update_ch_record(C#cr{consumer_count = Count - 1,
limiter = Limiter2,
blocked_consumers = Blocked1}),
State#state{consumers =
- remove_consumer(ChPid, ConsumerTag, Consumers)}
+ remove_consumer(ChPid, CTag, Consumers)}
end.
erase_ch(ChPid, State = #state{consumers = Consumers}) ->
@@ -213,14 +213,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
end.
deliver_to_consumer(FetchFun,
- #consumer{tag = ConsumerTag,
+ #consumer{tag = CTag,
ack_required = AckRequired},
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
QName) ->
{{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
true -> queue:in(AckTag, ChAckTags);
@@ -407,9 +407,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->
end,
priority_queue:in({ChPid, Consumer}, Priority, Queue).
-remove_consumer(ChPid, ConsumerTag, Queue) ->
- priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
- (CP /= ChPid) or (CTag /= ConsumerTag)
+remove_consumer(ChPid, CTag, Queue) ->
+ priority_queue:filter(fun ({CP, #consumer{tag = CT}}) ->
+ (CP /= ChPid) or (CT /= CTag)
end, Queue).
remove_consumers(ChPid, Queue) ->