summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl72
1 files changed, 42 insertions, 30 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dae0612e03..6e1f5060ee 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -174,10 +174,21 @@
-define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]).
--define(INCR_STATS(Incs, Measure, State),
+-define(INCR_STATS(Type, Key, Inc, Measure, State),
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> incr_stats(Incs, Measure);
- _ -> ok
+ fine ->
+ rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
+ %% Keys in the process dictionary are used to clean up the core metrics
+ put({Type, Key}, none);
+ _ ->
+ ok
+ end).
+
+-define(INCR_STATS(Type, Key, Inc, Measure),
+ begin
+ rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
+ %% Keys in the process dictionary are used to clean up the core metrics
+ put({Type, Key}, none)
end).
%%----------------------------------------------------------------------------
@@ -1658,7 +1669,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
State = #ch{protocol = Protocol, writer_pid = WriterPid},
Reason) ->
- ?INCR_STATS([{exchange_stats, ExchangeName, 1}], return_unroutable, State),
+ ?INCR_STATS(exchange_stats, ExchangeName, 1, return_unroutable, State),
{_Close, ReplyCode, ReplyText} = Protocol:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
@@ -1695,14 +1706,14 @@ record_sent(ConsumerTag, AckRequired,
user = #user{username = Username},
conn_name = ConnName,
channel = ChannelNum}) ->
- ?INCR_STATS([{queue_stats, QName, 1}], case {ConsumerTag, AckRequired} of
- {none, true} -> get;
- {none, false} -> get_no_ack;
- {_ , true} -> deliver;
- {_ , false} -> deliver_no_ack
- end, State),
+ ?INCR_STATS(queue_stats, QName, 1, case {ConsumerTag, AckRequired} of
+ {none, true} -> get;
+ {none, false} -> get_no_ack;
+ {_ , true} -> deliver;
+ {_ , false} -> deliver_no_ack
+ end, State),
case Redelivered of
- true -> ?INCR_STATS([{queue_stats, QName, 1}], redeliver, State);
+ true -> ?INCR_STATS(queue_stats, QName, 1, redeliver, State);
false -> ok
end,
rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState),
@@ -1747,11 +1758,11 @@ ack(Acked, State = #ch{queue_names = QNames}) ->
foreach_per_queue(
fun (QPid, MsgIds) ->
ok = rabbit_amqqueue:ack(QPid, MsgIds, self()),
- ?INCR_STATS(case dict:find(QPid, QNames) of
- {ok, QName} -> Count = length(MsgIds),
- [{queue_stats, QName, Count}];
- error -> []
- end, ack, State)
+ case dict:find(QPid, QNames) of
+ {ok, QName} -> Count = length(MsgIds),
+ ?INCR_STATS(queue_stats, QName, Count, ack, State);
+ error -> ok
+ end
end, Acked),
ok = notify_limiter(State#ch.limiter, Acked).
@@ -1816,7 +1827,7 @@ deliver_to_queues({#delivery{message = #basic_message{exchange_name = XName},
confirm = false,
mandatory = false},
[]}, State) -> %% optimisation
- ?INCR_STATS([{exchange_stats, XName, 1}], publish, State),
+ ?INCR_STATS(exchange_stats, XName, 1, publish, State),
State;
deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
exchange_name = XName},
@@ -1853,11 +1864,15 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
Message, State1),
State3 = process_routing_confirm( Confirm, DeliveredQPids, MsgSeqNo,
XName, State2),
- ?INCR_STATS([{exchange_stats, XName, 1} |
- [{queue_exchange_stats, {QName, XName}, 1} ||
- QPid <- DeliveredQPids,
- {ok, QName} <- [dict:find(QPid, QNames1)]]],
- publish, State3),
+ case rabbit_event:stats_level(State3, #ch.stats_timer) of
+ fine ->
+ ?INCR_STATS(exchange_stats, XName, 1, publish),
+ [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) ||
+ QPid <- DeliveredQPids,
+ {ok, QName} <- [dict:find(QPid, QNames1)]];
+ _ ->
+ ok
+ end,
State3.
process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) ->
@@ -1900,7 +1915,7 @@ send_confirms(State = #ch{tx = none, confirmed = C}) ->
ok -> MsgSeqNos =
lists:foldl(
fun ({MsgSeqNo, XName}, MSNs) ->
- ?INCR_STATS([{exchange_stats, XName, 1}],
+ ?INCR_STATS(exchange_stats, XName, 1,
confirm, State),
[MsgSeqNo | MSNs]
end, [], lists:append(C)),
@@ -1997,17 +2012,14 @@ i(Item, _) ->
name(#ch{conn_name = ConnName, channel = Channel}) ->
list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])).
-incr_stats(Incs, Measure) ->
- [begin
- rabbit_core_metrics:channel_stats(Type, Measure, {self(), Key}, Inc),
- %% Keys in the process dictionary are used to clean up the core metrics
- put({Type, Key}, none)
- end || {Type, Key, Inc} <- Incs].
-
emit_stats(State) -> emit_stats(State, []).
emit_stats(State, Extra) ->
[{reductions, Red} | Coarse0] = infos(?STATISTICS_KEYS, State),
+ %% First metric must be `idle_since` (if available), as expected by
+ %% `rabbit_mgmt_format:format_channel_stats`. This is a performance
+ %% optimisation that avoids traversing the whole list when only
+ %% one element has to be formatted.
rabbit_core_metrics:channel_stats(self(), Extra ++ Coarse0),
rabbit_core_metrics:channel_stats(reductions, self(), Red).