diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 72 |
1 files changed, 42 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dae0612e03..6e1f5060ee 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -174,10 +174,21 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). --define(INCR_STATS(Incs, Measure, State), +-define(INCR_STATS(Type, Key, Inc, Measure, State), case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> incr_stats(Incs, Measure); - _ -> ok + fine -> + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none); + _ -> + ok + end). + +-define(INCR_STATS(Type, Key, Inc, Measure), + begin + rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), + %% Keys in the process dictionary are used to clean up the core metrics + put({Type, Key}, none) end). %%---------------------------------------------------------------------------- @@ -1658,7 +1669,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, State = #ch{protocol = Protocol, writer_pid = WriterPid}, Reason) -> - ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State), + ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State), {_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, @@ -1695,14 +1706,14 @@ record_sent(ConsumerTag, AckRequired, user = #user{username = Username}, conn_name = ConnName, channel = ChannelNum}) -> - ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of - {none, true} -> get; - {none, false} -> get_no_ack; - {_ , true} -> deliver; - {_ , false} -> deliver_no_ack - end, State), + ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of + {none, true} -> get; + {none, false} -> get_no_ack; + {_ , true} -> deliver; + {_ , false} -> deliver_no_ack + end, State), case Redelivered of - true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State); + true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State); false -> ok end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), @@ -1747,11 +1758,11 @@ ack(Acked, State = #ch{queue_names = QNames}) -> foreach_per_queue( fun (QPid, MsgIds) -> ok = rabbit_amqqueue:ack(QPid, MsgIds, self()), - ?INCR_STATS(case dict:find(QPid, QNames) of - {ok, QName} -> Count = length(MsgIds), - [{queue_stats, QName, Count}]; - error -> [] - end, ack, State) + case dict:find(QPid, QNames) of + {ok, QName} -> Count = length(MsgIds), + ?INCR_STATS(queue_stats, QName, Count, ack, State); + error -> ok + end end, Acked), ok = notify_limiter(State#ch.limiter, Acked). @@ -1816,7 +1827,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName}, confirm = false, mandatory = false}, []}, State) -> %% optimisation - ?INCR_STATS([{exchange_stats, XName, 1}], publish, State), + ?INCR_STATS(exchange_stats, XName, 1, publish, State), State; deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ exchange_name = XName}, @@ -1853,11 +1864,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Message, State1), State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo, XName, State2), - ?INCR_STATS([{exchange_stats, XName, 1} | - [{queue_exchange_stats, {QName, XName}, 1} || - QPid <- DeliveredQPids, - {ok, QName} <- [dict:find(QPid, QNames1)]]], - publish, State3), + case rabbit_event:stats_level(State3, #ch.stats_timer) of + fine -> + ?INCR_STATS(exchange_stats, XName, 1, publish), + [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || + QPid <- DeliveredQPids, + {ok, QName} <- [dict:find(QPid, QNames1)]]; + _ -> + ok + end, State3. process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> @@ -1900,7 +1915,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) -> ok -> MsgSeqNos = lists:foldl( fun ({MsgSeqNo, XName}, MSNs) -> - ?INCR_STATS([{exchange_stats, XName, 1}], + ?INCR_STATS(exchange_stats, XName, 1, confirm, State), [MsgSeqNo | MSNs] end, [], lists:append(C)), @@ -1997,17 +2012,14 @@ i(Item, _) -> name(#ch{conn_name = ConnName, channel = Channel}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). -incr_stats(Incs, Measure) -> - [begin - rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc), - %% Keys in the process dictionary are used to clean up the core metrics - put({Type, Key}, none) - end || {Type, Key, Inc} <- Incs]. - emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> [{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State), + %% First metric must be `idle_since` (if available), as expected by + %% `rabbit_mgmt_format:format_channel_stats`. This is a performance + %% optimisation that avoids traversing the whole list when only + %% one element has to be formatted. rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0), rabbit_core_metrics:channel_stats(reductions, self(), Red). |
