summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2010-07-21 16:36:54 +0100
committerSimon MacMullen <simon@rabbitmq.com>2010-07-21 16:36:54 +0100
commit5327f905660ddf4b45b437b0ddded46695175bb2 (patch)
tree3656e3a16482842d93c9b377545c73ad83ea6481 /src
parent6482e6d649ca43e6ec965f313e5ff8ec2ce169dd (diff)
downloadrabbitmq-server-git-5327f905660ddf4b45b437b0ddded46695175bb2.tar.gz
Store (channel, exchange) stats as well as (channel, exchange, queue) since we can't derive the former from the latter.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl53
1 files changed, 34 insertions, 19 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7fe29d3e18..8597e477cb 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -485,7 +485,9 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
- incr_stats([{{QPid, ExchangeName}, 1} || QPid <- DeliveredQPids], publish),
+ incr_stats([{ExchangeName, 1} |
+ [{{QPid, ExchangeName}, 1} ||
+ QPid <- DeliveredQPids]], publish),
{noreply, case TxnKey of
none -> State;
_ -> add_tx_participants(DeliveredQPids, State)
@@ -1185,39 +1187,52 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-incr_stats(QXIncs, Item) ->
- [incr_stats(QX, Inc, Item) || {QX, Inc} <- QXIncs].
+incr_stats(QXIncs, Measure) ->
+ [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs].
-incr_stats(QX, Inc, Item) ->
- QPid = case QX of
- {Q, _X} -> Q;
- Q -> Q
- end,
+incr_stats({QPid, _} = QX, Inc, Measure) ->
+ maybe_monitor(QPid),
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ maybe_monitor(QPid),
+ update_measures(queue_stats, QPid, Inc, Measure);
+
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
+
+maybe_monitor(QPid) ->
case get({monitoring, QPid}) of
undefined -> erlang:monitor(process, QPid),
put({monitoring, QPid}, true);
_ -> ok
- end,
- Measures = case get({queue_exchange_stats, QX}) of
+ end.
+
+update_measures(Type, QX, Inc, Measure) ->
+ Measures = case get({Type, QX}) of
undefined -> [];
D -> D
end,
- Cur = case orddict:find(Item, Measures) of
+ Cur = case orddict:find(Measure, Measures) of
error -> 0;
{ok, C} -> C
end,
- put({queue_exchange_stats, QX}, orddict:store(Item, Cur + Inc, Measures)).
+ put({Type, QX},
+ orddict:store(Measure, Cur + Inc, Measures)).
internal_emit_stats(State) ->
rabbit_event:notify(
channel_stats,
- [{queue_exchange_stats,
- [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]} |
- [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]]).
+ [{queue_stats,
+ [{QPid, Stats} || {{queue_stats, QPid}, Stats} <- get()]},
+ {exchange_stats,
+ [{X, Stats} || {{exchange_stats, X}, Stats} <- get()]},
+ {queue_exchange_stats,
+ [{QX, Stats} || {{queue_exchange_stats, QX}, Stats} <- get()]}] ++
+ [{Item, i(Item, State)} || Item <- ?STATISTICS_KEYS]).
erase_queue_stats(QPid) ->
erase({monitoring, QPid}),
- erase({queue_exchange_stats, QPid}).
- %% TODO make this work
- %% [erase({queue_exchange_stats, QX}) ||
- %% {{queue_exchange_stats, QX}, _} <- get(), {QPid, _} == QX].
+ erase({queue_stats, QPid}),
+ [erase({queue_exchange_stats, QX}) ||
+ {{queue_exchange_stats, QX = {QPid0, _}}, _} <- get(), QPid == QPid0].