diff options
| -rw-r--r-- | src/rabbit_channel.erl | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d99a1c038a..b05d6c561c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -461,7 +461,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, - State1 = incr_exchange_stats(ExchangeName, State), + State1 = incr_exchange_stats([{ExchangeName, 1}], publish, State), {noreply, case TxnKey of none -> State1; _ -> add_tx_participants(DeliveredQPids, State1) @@ -1161,29 +1161,31 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_exchange_stats(ExchangeName, State = #ch{exchange_statistics = Stats}) -> - Stats1 = dict:update(ExchangeName, fun(Old) -> Old + 1 end, 0, Stats), - State#ch{exchange_statistics = Stats1}. +incr_exchange_stats(Counts, Item, State = #ch{exchange_statistics = Stats}) -> + State#ch{exchange_statistics = incr_stats(Counts, Item, Stats)}. -incr_queue_stats(Counts, Key, State = #ch{queue_statistics = Stats}) -> +incr_queue_stats(Counts, Item, State = #ch{queue_statistics = Stats}) -> + State#ch{queue_statistics = incr_stats(Counts, Item, Stats)}. + +incr_stats(Counts, Item, Stats) -> Stats1 = lists:foldl( - fun ({QPid, Incr}, Stats0) -> - case dict:is_key(QPid, Stats0) of - false -> erlang:monitor(process, QPid); - _ -> ok + fun ({Key, Incr}, Stats0) -> + case is_pid(Key) andalso not dict:is_key(Key, Stats0) of + true -> erlang:monitor(process, Key); + _ -> ok end, - dict:update(QPid, + dict:update(Key, fun(D) -> - Count = case orddict:find(Key, D) of + Count = case orddict:find(Item, D) of error -> 0; {ok, C} -> C end, - orddict:store(Key, Count + Incr, D) + orddict:store(Item, Count + Incr, D) end, [], Stats0) end, Stats, Counts), - State#ch{queue_statistics = Stats1}. + Stats1. maybe_emit_stats(State = #ch{exchange_statistics = ExchangeStatistics, queue_statistics = QueueStatistics, |
