diff options
| -rw-r--r-- | include/rabbit.hrl | 19 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 93 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 45 |
4 files changed, 138 insertions, 20 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 1998de35b4..f400495ce3 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -70,10 +70,28 @@ -record(delivery, {mandatory, immediate, txn, sender, message}). -record(amqp_error, {name, explanation, method = none}). + +-record(event_connection_stats, {connection_pid, state, channels, + recv_oct, recv_cnt, + send_oct, send_cnt, send_pend}). + +-record(event_channel_stats, {channel_pid, per_exchange_statistics, + per_queue_statistics}). + -record(event_queue_stats, {qpid, messages_ready, messages_unacknowledged, consumers, memory, exclusive_consumer_pid, exclusive_consumer_tag, backing_queue_status}). +-record(event_connection_created, {connection_pid, address, port, + peer_address, peer_port, user, vhost, + timeout, frame_max, client_properties}). +-record(event_connection_closed, {connection_pid}). +-record(event_channel_created, {channel_pid, connection_pid, channel, user, + vhost}). +-record(event_channel_closed, {channel_pid}). + + + %%---------------------------------------------------------------------------- -define(COPYRIGHT_MESSAGE, "Copyright (C) 2007-2010 LShift Ltd., Cohesive Financial Technologies LLC., and Rabbit Technologies Ltd."). @@ -84,6 +102,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e31f821015..9af77e7843 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -38,7 +38,6 @@ -define(UNSENT_MESSAGE_LIMIT, 100). -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(STATISTICS_UPDATE_INTERVAL, 5000000). %% microseconds -export([start_link/1, info_keys/0]). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index c4db3ace73..44699af6d5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -48,7 +48,8 @@ transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, flow}). + consumer_mapping, blocking, queue_collector_pid, flow, + exchange_statistics, queue_statistics, last_statistics_update}). -record(flow, {server, client, pending}). @@ -157,6 +158,11 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> process_flag(trap_exit, true), link(WriterPid), ok = pg_local:join(rabbit_channels, self()), + rabbit_event:notify(#event_channel_created{channel_pid = self(), + connection_pid = ReaderPid, + channel = Channel, + user = Username, + vhost = VHost}), {ok, #ch{state = starting, channel = Channel, reader_pid = ReaderPid, @@ -174,7 +180,10 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid]) -> blocking = dict:new(), queue_collector_pid = CollectorPid, flow = #flow{server = true, client = true, - pending = none}}, + pending = none}, + exchange_statistics = dict:new(), + queue_statistics = dict:new(), + last_statistics_update = {0,0,0}}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -225,7 +234,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, Msg}, next_tag = DeliveryTag}) -> State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - noreply(State1#ch{next_tag = DeliveryTag + 1}); + {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + State2 = incr_queue_stats([{QPid, 1}], + case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + noreply(State2#ch{next_tag = DeliveryTag + 1}); handle_cast({conserve_memory, true}, State = #ch{state = starting}) -> noreply(State); @@ -276,9 +291,13 @@ code_change(_OldVsn, State, _Extra) -> %%--------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, hibernate}. +reply(Reply, NewState) -> + NewState1 = maybe_emit_stats(NewState), + {reply, Reply, NewState1, hibernate}. -noreply(NewState) -> {noreply, NewState, hibernate}. +noreply(NewState) -> + NewState1 = maybe_emit_stats(NewState), + {noreply, NewState1, hibernate}. return_ok(State, true, _Msg) -> {noreply, State}; return_ok(State, false, Msg) -> {reply, Msg, State}. @@ -437,9 +456,10 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, unroutable -> ok = basic_return(Message, WriterPid, no_route); not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) end, + State1 = incr_exchange_stats(ExchangeName, State), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State1; + _ -> add_tx_participants(DeliveredQPids, State1) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -447,16 +467,18 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _, State = #ch{transaction_id = TxnKey, unacked_message_q = UAMQ}) -> {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, Multiple), - Participants = ack(TxnKey, Acked), + QsCounts = ack(TxnKey, Acked), + Participants = [QPid || {QPid, _} <- QsCounts], + State1 = incr_queue_stats(QsCounts, ack, State), {noreply, case TxnKey of none -> ok = notify_limiter(State#ch.limiter_pid, Acked), - State#ch{unacked_message_q = Remaining}; + State1#ch{unacked_message_q = Remaining}; _ -> NewUAQ = queue:join(State#ch.uncommitted_ack_q, Acked), add_tx_participants( Participants, - State#ch{unacked_message_q = Remaining, - uncommitted_ack_q = NewUAQ}) + State1#ch{unacked_message_q = Remaining, + uncommitted_ack_q = NewUAQ}) end}; handle_method(#'basic.get'{queue = QueueNameBin, @@ -470,11 +492,16 @@ handle_method(#'basic.get'{queue = QueueNameBin, QueueName, ReaderPid, fun (Q) -> rabbit_amqqueue:basic_get(Q, self(), NoAck) end) of {ok, MessageCount, - Msg = {_QName, _QPid, _MsgId, Redelivered, + Msg = {_QName, QPid, _MsgId, Redelivered, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + State2 = incr_queue_stats([{QPid, 1}], + case NoAck of + true -> get_no_ack; + false -> get + end, State1), ok = rabbit_writer:send_command( WriterPid, #'basic.get_ok'{delivery_tag = DeliveryTag, @@ -483,7 +510,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State1#ch{next_tag = DeliveryTag + 1}}; + {noreply, State2#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -978,7 +1005,7 @@ ack(TxnKey, UAQ) -> fold_per_queue( fun (QPid, MsgIds, L) -> ok = rabbit_amqqueue:ack(QPid, TxnKey, MsgIds, self()), - [QPid | L] + [{QPid, length(MsgIds)} | L] end, [], UAQ). make_tx_id() -> rabbit_guid:guid(). @@ -1105,6 +1132,7 @@ internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, terminate(#ch{writer_pid = WriterPid, limiter_pid = LimiterPid}) -> pg_local:leave(rabbit_channels, self()), + rabbit_event:notify(#event_channel_closed{channel_pid = self()}), rabbit_writer:shutdown(WriterPid), rabbit_limiter:shutdown(LimiterPid). @@ -1127,3 +1155,40 @@ i(prefetch_count, #ch{limiter_pid = LimiterPid}) -> rabbit_limiter:get_limit(LimiterPid); i(Item, _) -> throw({bad_argument, Item}). + +incr_exchange_stats(ExchangeName, State = #ch{exchange_statistics = Stats}) -> + Stats1 = dict:update(ExchangeName, fun(Old) -> Old + 1 end, 0, Stats), + State#ch{exchange_statistics = Stats1}. + +incr_queue_stats(Counts, Key, State = #ch{queue_statistics = Stats}) -> + Stats1 = lists:foldl( + fun ({QPid, Incr}, Stats0) -> + dict:update(QPid, + fun(D) -> + Count = case orddict:find(Key, D) of + error -> 0; + {ok, C} -> C + end, + orddict:store(Key, Count + Incr, D) + end, + [], + Stats0) + end, Stats, Counts), + State#ch{queue_statistics = Stats1}. + +maybe_emit_stats(State = #ch{exchange_statistics = ExchangeStatistics, + queue_statistics = QueueStatistics, + last_statistics_update = LastUpdate}) -> + Now = os:timestamp(), + case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of + true -> + rabbit_event:notify( + #event_channel_stats{channel_pid = self(), + per_exchange_statistics = + dict:to_list(ExchangeStatistics), + per_queue_statistics = + dict:to_list(QueueStatistics)}), + State#ch{last_statistics_update = Now}; + _ -> + State + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b5514c822a..2dc6d9338e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -58,7 +58,7 @@ %--------------------------------------------------------------------------- -record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector}). + queue_collector, last_statistics_update}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -253,7 +253,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, - queue_collector = Collector}, + queue_collector = Collector, + last_statistics_update = {0,0,0}}, handshake, 8)) catch Ex -> (if Ex == connection_closed_abruptly -> @@ -273,12 +274,14 @@ start_connection(Parent, Deb, Sock, SockTransform) -> %% gen_tcp:close(ClientSock), teardown_profiling(ProfilingValue), rabbit_queue_collector:shutdown(Collector), - rabbit_misc:unlink_and_capture_exit(Collector) + rabbit_misc:unlink_and_capture_exit(Collector), + rabbit_event:notify(#event_connection_closed{connection_pid = self()}) end, done. -mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> +mainloop(Parent, Deb, State_ = #v1{sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), + State = maybe_emit_stats(State_), receive {inet_async, Sock, Ref, {ok, Data}} -> {State1, Callback1, Length1} = @@ -649,7 +652,8 @@ handle_method0(#'connection.open'{virtual_host = VHostPath, insist = Insist}, State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + vhost = VHost}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, @@ -659,6 +663,19 @@ handle_method0(#'connection.open'{virtual_host = VHostPath, ok = send_on_channel0( Sock, #'connection.open_ok'{known_hosts = KnownHosts}), + rabbit_event:notify( + #event_connection_created{connection_pid = self(), + address = i(address, State), + port = i(port, State), + peer_address = i(peer_address, State), + peer_port = i(peer_port, State), + user = User, + vhost = VHost, + timeout = i(timeout, State), + frame_max = i(frame_max, State), + client_properties = + i(client_properties, State) + }), State#v1{connection_state = running, connection = NewConnection}; true -> @@ -847,3 +864,21 @@ amqp_exception_explanation(Text, Expl) -> if size(CompleteTextBin) > 255 -> <<CompleteTextBin:252/binary, "...">>; true -> CompleteTextBin end. + +maybe_emit_stats(State = #v1{last_statistics_update = LastUpdate}) -> + Now = os:timestamp(), + case timer:now_diff(Now, LastUpdate) > ?STATISTICS_UPDATE_INTERVAL of + true -> + rabbit_event:notify( + #event_connection_stats{connection_pid = self(), + state = i(state, State), + channels = i(channels, State), + recv_oct = i(recv_oct, State), + recv_cnt = i(recv_cnt, State), + send_oct = i(send_oct, State), + send_cnt = i(send_cnt, State), + send_pend = i(send_pend, State)}), + State#v1{last_statistics_update = Now}; + _ -> + State + end. |
