diff options
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 9f2f4f6a0d..bea7e0d00d 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -118,7 +118,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), @@ -130,31 +130,31 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, update_ch_record(case parse_credit_args(Args) of none -> C1; {Crd, Drain} -> credit_and_drain( - C1, ConsumerTag, Crd, Drain, IsEmpty) + C1, CTag, Crd, Drain, IsEmpty) end), - Consumer = #consumer{tag = ConsumerTag, + Consumer = #consumer{tag = CTag, ack_required = not NoAck, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. -remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> +remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Blocked1 = remove_consumer(ChPid, CTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag), update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), State#state{consumers = - remove_consumer(ChPid, ConsumerTag, Consumers)} + remove_consumer(ChPid, CTag, Consumers)} end. erase_ch(ChPid, State = #state{consumers = Consumers}) -> @@ -213,14 +213,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> end. deliver_to_consumer(FetchFun, - #consumer{tag = ConsumerTag, + #consumer{tag = CTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, QName) -> {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of true -> queue:in(AckTag, ChAckTags); @@ -407,9 +407,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> end, priority_queue:in({ChPid, Consumer}, Priority, Queue). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) +remove_consumer(ChPid, CTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= CTag) end, Queue). remove_consumers(ChPid, Queue) -> |
