diff options
| -rw-r--r-- | src/rabbit_channel.erl | 19 |
1 files changed, 9 insertions, 10 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index a8f3f88c26..1b74b655f5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1811,7 +1811,7 @@ internal_reject(Requeue, Acked, Limiter, ok = notify_limiter(Limiter, Acked), State#ch{queue_states = QueueStates}. -record_sent(ConsumerTag, AckRequired, +record_sent(Type, Tag, AckRequired, Msg = {QName, QPid, MsgId, Redelivered, _Message}, State = #ch{unacked_message_q = UAMQ, next_tag = DeliveryTag, @@ -1819,12 +1819,11 @@ record_sent(ConsumerTag, AckRequired, user = #user{username = Username}, conn_name = ConnName, channel = ChannelNum}) -> - ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of - {_, true} when is_integer(ConsumerTag) -> get; - {_, false} when is_integer(ConsumerTag) -> get_no_ack; - %% Authentic consumer tag, this is a delivery - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack + ?INCR_STATS(queue_stats, QName, 1, case {Type, AckRequired} of + {get, true} -> get; + {get, false} -> get_no_ack; + {deliver, true} -> deliver; + {deliver, false} -> deliver_no_ack end, State), case Redelivered of true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); @@ -1832,7 +1831,7 @@ record_sent(ConsumerTag, AckRequired, end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> queue:in({DeliveryTag, ConsumerTag, {QPid, MsgId}}, + true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}}, UAMQ); false -> UAMQ end, @@ -2457,7 +2456,7 @@ handle_deliver(ConsumerTag, AckRequired, ok = rabbit_writer:send_command(WriterPid, Deliver, Content) end, rabbit_basic:maybe_gc_large_msg(Content), - record_sent(ConsumerTag, AckRequired, Msg, State). + record_sent(deliver, ConsumerTag, AckRequired, Msg, State). handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, Msg = {QName, QPid, _MsgId, Redelivered, @@ -2473,7 +2472,7 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount, message_count = MessageCount}, Content), State1 = track_delivering_queue(NoAck, QPid, QName, State), - {noreply, record_sent(DeliveryTag, not(NoAck), Msg, State1)}. + {noreply, record_sent(get, DeliveryTag, not(NoAck), Msg, State1)}. init_queue_cleanup_timer(State) -> {ok, Interval} = application:get_env(rabbit, channel_queue_cleanup_interval), |
