diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 53 |
1 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7fe29d3e18..8597e477cb 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -485,7 +485,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, - incr_stats([{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish), + incr_stats([{ExchangeName, 1} | + [{{QPid, ExchangeName}, 1} || + QPid <- DeliveredQPids]], publish), {noreply, case TxnKey of none -> State; _ -> add_tx_participants(DeliveredQPids, State) @@ -1185,39 +1187,52 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> i(Item, _) -> throw({bad_argument, Item}). -incr_stats(QXIncs, Item) -> - [incr_stats(QX, Inc, Item) || {QX, Inc} <- QXIncs]. +incr_stats(QXIncs, Measure) -> + [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]. -incr_stats(QX, Inc, Item) -> - QPid = case QX of - {Q, _X} -> Q; - Q -> Q - end, +incr_stats({QPid, _} = QX, Inc, Measure) -> + maybe_monitor(QPid), + update_measures(queue_exchange_stats, QX, Inc, Measure); + +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + maybe_monitor(QPid), + update_measures(queue_stats, QPid, Inc, Measure); + +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). + +maybe_monitor(QPid) -> case get({monitoring, QPid}) of undefined -> erlang:monitor(process, QPid), put({monitoring, QPid}, true); _ -> ok - end, - Measures = case get({queue_exchange_stats, QX}) of + end. + +update_measures(Type, QX, Inc, Measure) -> + Measures = case get({Type, QX}) of undefined -> []; D -> D end, - Cur = case orddict:find(Item, Measures) of + Cur = case orddict:find(Measure, Measures) of error -> 0; {ok, C} -> C end, - put({queue_exchange_stats, QX}, orddict:store(Item, Cur + Inc, Measures)). + put({Type, QX}, + orddict:store(Measure, Cur + Inc, Measures)). internal_emit_stats(State) -> rabbit_event:notify( channel_stats, - [{queue_exchange_stats, - [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]} | - [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]). + [{queue_stats, + [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]}, + {exchange_stats, + [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]}, + {queue_exchange_stats, + [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++ + [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]). erase_queue_stats(QPid) -> erase({monitoring, QPid}), - erase({queue_exchange_stats, QPid}). - %% TODO make this work - %% [erase({queue_exchange_stats, QX}) || - %% {{queue_exchange_stats, QX}, _} <- get(), {QPid, _} == QX]. + erase({queue_stats, QPid}), + [erase({queue_exchange_stats, QX}) || + {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0]. |
