summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl19
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_channel.erl93
-rw-r--r--src/rabbit_reader.erl45
4 files changed, 138 insertions, 20 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 1998de35b4..f400495ce3 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -70,10 +70,28 @@
-record(delivery, {mandatory, immediate, txn, sender, message}).
-record(amqp_error, {name, explanation, method = none}).
+
+-record(event_connection_stats, {connection_pid, state, channels,
+ recv_oct, recv_cnt,
+ send_oct, send_cnt, send_pend}).
+
+-record(event_channel_stats, {channel_pid, per_exchange_statistics,
+ per_queue_statistics}).
+
-record(event_queue_stats, {qpid, messages_ready, messages_unacknowledged,
consumers, memory, exclusive_consumer_pid,
exclusive_consumer_tag, backing_queue_status}).
+-record(event_connection_created, {connection_pid, address, port,
+ peer_address, peer_port, user, vhost,
+ timeout, frame_max, client_properties}).
+-record(event_connection_closed, {connection_pid}).
+-record(event_channel_created, {channel_pid, connection_pid, channel, user,
+ vhost}).
+-record(event_channel_closed, {channel_pid}).
+
+
+
%%----------------------------------------------------------------------------
-define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd.").
@@ -84,6 +102,7 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e31f821015..9af77e7843 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -38,7 +38,6 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
--define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds
-export([start_link/1, info_keys/0]).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index c4db3ace73..44699af6d5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -48,7 +48,8 @@
transaction_id, tx_participants, next_tag,
uncommitted_ack_q, unacked_message_q,
username, virtual_host, most_recently_declared_queue,
- consumer_mapping, blocking, queue_collector_pid, flow}).
+ consumer_mapping, blocking, queue_collector_pid, flow,
+ exchange_statistics, queue_statistics, last_statistics_update}).
-record(flow, {server, client, pending}).
@@ -157,6 +158,11 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
process_flag(trap_exit, true),
link(WriterPid),
ok = pg_local:join(rabbit_channels, self()),
+ rabbit_event:notify(#event_channel_created{channel_pid = self(),
+ connection_pid = ReaderPid,
+ channel = Channel,
+ user = Username,
+ vhost = VHost}),
{ok, #ch{state = starting,
channel = Channel,
reader_pid = ReaderPid,
@@ -174,7 +180,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) ->
blocking = dict:new(),
queue_collector_pid = CollectorPid,
flow = #flow{server = true, client = true,
- pending = none}},
+ pending = none},
+ exchange_statistics = dict:new(),
+ queue_statistics = dict:new(),
+ last_statistics_update = {0,0,0}},
hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -225,7 +234,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg},
next_tag = DeliveryTag}) ->
State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State),
ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg),
- noreply(State1#ch{next_tag = DeliveryTag + 1});
+ {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg,
+ State2 = incr_queue_stats([{QPid, 1}],
+ case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ noreply(State2#ch{next_tag = DeliveryTag + 1});
handle_cast({conserve_memory, true}, State = #ch{state = starting}) ->
noreply(State);
@@ -276,9 +291,13 @@ code_change(_OldVsn, State, _Extra) ->
%%---------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}.
+reply(Reply, NewState) ->
+ NewState1 = maybe_emit_stats(NewState),
+ {reply, Reply, NewState1, hibernate}.
-noreply(NewState) -> {noreply, NewState, hibernate}.
+noreply(NewState) ->
+ NewState1 = maybe_emit_stats(NewState),
+ {noreply, NewState1, hibernate}.
return_ok(State, true, _Msg) -> {noreply, State};
return_ok(State, false, Msg) -> {reply, Msg, State}.
@@ -437,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
unroutable -> ok = basic_return(Message, WriterPid, no_route);
not_delivered -> ok = basic_return(Message, WriterPid, no_consumers)
end,
+ State1 = incr_exchange_stats(ExchangeName, State),
{noreply, case TxnKey of
- none -> State;
- _ -> add_tx_participants(DeliveredQPids, State)
+ none -> State1;
+ _ -> add_tx_participants(DeliveredQPids, State1)
end};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
@@ -447,16 +467,18 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_, State = #ch{transaction_id = TxnKey,
unacked_message_q = UAMQ}) ->
{Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple),
- Participants = ack(TxnKey, Acked),
+ QsCounts = ack(TxnKey, Acked),
+ Participants = [QPid || {QPid, _} <- QsCounts],
+ State1 = incr_queue_stats(QsCounts, ack, State),
{noreply, case TxnKey of
none -> ok = notify_limiter(State#ch.limiter_pid, Acked),
- State#ch{unacked_message_q = Remaining};
+ State1#ch{unacked_message_q = Remaining};
_ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q,
Acked),
add_tx_participants(
Participants,
- State#ch{unacked_message_q = Remaining,
- uncommitted_ack_q = NewUAQ})
+ State1#ch{unacked_message_q = Remaining,
+ uncommitted_ack_q = NewUAQ})
end};
handle_method(#'basic.get'{queue = QueueNameBin,
@@ -470,11 +492,16 @@ handle_method(#'basic.get'{queue = QueueNameBin,
QueueName, ReaderPid,
fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of
{ok, MessageCount,
- Msg = {_QName, _QPid, _MsgId, Redelivered,
+ Msg = {_QName, QPid, _MsgId, Redelivered,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = Content}}} ->
State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State),
+ State2 = incr_queue_stats([{QPid, 1}],
+ case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.get_ok'{delivery_tag = DeliveryTag,
@@ -483,7 +510,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State1#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State2#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -978,7 +1005,7 @@ ack(TxnKey, UAQ) ->
fold_per_queue(
fun (QPid, MsgIds, L) ->
ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()),
- [QPid | L]
+ [{QPid, length(MsgIds)} | L]
end, [], UAQ).
make_tx_id() -> rabbit_guid:guid().
@@ -1105,6 +1132,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag,
terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) ->
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:notify(#event_channel_closed{channel_pid = self()}),
rabbit_writer:shutdown(WriterPid),
rabbit_limiter:shutdown(LimiterPid).
@@ -1127,3 +1155,40 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) ->
rabbit_limiter:get_limit(LimiterPid);
i(Item, _) ->
throw({bad_argument, Item}).
+
+incr_exchange_stats(ExchangeName, State = #ch{exchange_statistics = Stats}) ->
+ Stats1 = dict:update(ExchangeName, fun(Old) -> Old + 1 end, 0, Stats),
+ State#ch{exchange_statistics = Stats1}.
+
+incr_queue_stats(Counts, Key, State = #ch{queue_statistics = Stats}) ->
+ Stats1 = lists:foldl(
+ fun ({QPid, Incr}, Stats0) ->
+ dict:update(QPid,
+ fun(D) ->
+ Count = case orddict:find(Key, D) of
+ error -> 0;
+ {ok, C} -> C
+ end,
+ orddict:store(Key, Count + Incr, D)
+ end,
+ [],
+ Stats0)
+ end, Stats, Counts),
+ State#ch{queue_statistics = Stats1}.
+
+maybe_emit_stats(State = #ch{exchange_statistics = ExchangeStatistics,
+ queue_statistics = QueueStatistics,
+ last_statistics_update = LastUpdate}) ->
+ Now = os:timestamp(),
+ case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
+ true ->
+ rabbit_event:notify(
+ #event_channel_stats{channel_pid = self(),
+ per_exchange_statistics =
+ dict:to_list(ExchangeStatistics),
+ per_queue_statistics =
+ dict:to_list(QueueStatistics)}),
+ State#ch{last_statistics_update = Now};
+ _ ->
+ State
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b5514c822a..2dc6d9338e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -58,7 +58,7 @@
%---------------------------------------------------------------------------
-record(v1, {sock, connection, callback, recv_ref, connection_state,
- queue_collector}).
+ queue_collector, last_statistics_update}).
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
@@ -253,7 +253,8 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
callback = uninitialized_callback,
recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector},
+ queue_collector = Collector,
+ last_statistics_update = {0,0,0}},
handshake, 8))
catch
Ex -> (if Ex == connection_closed_abruptly ->
@@ -273,12 +274,14 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
%% gen_tcp:close(ClientSock),
teardown_profiling(ProfilingValue),
rabbit_queue_collector:shutdown(Collector),
- rabbit_misc:unlink_and_capture_exit(Collector)
+ rabbit_misc:unlink_and_capture_exit(Collector),
+ rabbit_event:notify(#event_connection_closed{connection_pid = self()})
end,
done.
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
+ State = maybe_emit_stats(State_),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
@@ -649,7 +652,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath,
insist = Insist},
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ vhost = VHost},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
@@ -659,6 +663,19 @@ handle_method0(#'connection.open'{virtual_host = VHostPath,
ok = send_on_channel0(
Sock,
#'connection.open_ok'{known_hosts = KnownHosts}),
+ rabbit_event:notify(
+ #event_connection_created{connection_pid = self(),
+ address = i(address, State),
+ port = i(port, State),
+ peer_address = i(peer_address, State),
+ peer_port = i(peer_port, State),
+ user = User,
+ vhost = VHost,
+ timeout = i(timeout, State),
+ frame_max = i(frame_max, State),
+ client_properties =
+ i(client_properties, State)
+ }),
State#v1{connection_state = running,
connection = NewConnection};
true ->
@@ -847,3 +864,21 @@ amqp_exception_explanation(Text, Expl) ->
if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>;
true -> CompleteTextBin
end.
+
+maybe_emit_stats(State = #v1{last_statistics_update = LastUpdate}) ->
+ Now = os:timestamp(),
+ case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of
+ true ->
+ rabbit_event:notify(
+ #event_connection_stats{connection_pid = self(),
+ state = i(state, State),
+ channels = i(channels, State),
+ recv_oct = i(recv_oct, State),
+ recv_cnt = i(recv_cnt, State),
+ send_oct = i(send_oct, State),
+ send_cnt = i(send_cnt, State),
+ send_pend = i(send_pend, State)}),
+ State#v1{last_statistics_update = Now};
+ _ ->
+ State
+ end.