diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:23:32 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:23:32 +0000 |
| commit | e6c9d5f2d9547753275c9c6e297db44a5f1725d9 (patch) | |
| tree | 298150db58ae3ed90d46bf8c3804f511ef64c729 /src | |
| parent | 140f3de392804f2f069411317c3325dacb59fe3a (diff) | |
| parent | b8cc1fcd86c9e293a9a6930b5ec96ce7da03f745 (diff) | |
| download | rabbitmq-server-git-e6c9d5f2d9547753275c9c6e297db44a5f1725d9.tar.gz | |
merge default into bug24297
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index ca47b43442..bf3d857d36 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -141,24 +141,24 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. -remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> +remove(ChPid, CTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> not_found; C = #cr{consumer_count = Count, limiter = Limiter, blocked_consumers = Blocked} -> - Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), + Blocked1 = remove_consumer(ChPid, CTag, Blocked), Limiter1 = case Count of 1 -> rabbit_limiter:deactivate(Limiter); _ -> Limiter end, - Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, CTag), update_ch_record(C#cr{consumer_count = Count - 1, limiter = Limiter2, blocked_consumers = Blocked1}), State#state{consumers = - remove_consumer(ChPid, ConsumerTag, Consumers)} + remove_consumer(ChPid, CTag, Consumers)} end. erase_ch(ChPid, State = #state{consumers = Consumers}) -> @@ -217,14 +217,14 @@ deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> end. deliver_to_consumer(FetchFun, - #consumer{tag = ConsumerTag, + #consumer{tag = CTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, QName) -> {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, + rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of true -> queue:in(AckTag, ChAckTags); @@ -423,9 +423,9 @@ add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> end, priority_queue:in({ChPid, Consumer}, Priority, Queue). -remove_consumer(ChPid, ConsumerTag, Queue) -> - priority_queue:filter(fun ({CP, #consumer{tag = CTag}}) -> - (CP /= ChPid) or (CTag /= ConsumerTag) +remove_consumer(ChPid, CTag, Queue) -> + priority_queue:filter(fun ({CP, #consumer{tag = CT}}) -> + (CP /= ChPid) or (CT /= CTag) end, Queue). remove_consumers(ChPid, Queue) -> |
