summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-01 14:45:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-01 14:45:10 +0000
commitbcb4dd60658b3095511cbf2532c5ac102926c672 (patch)
treefc2aff5c11fb79a372cee05dec9574080d2e6413 /src
parent5d7a72f96fee293a263a1ae02d789aa145c1b044 (diff)
downloadrabbitmq-server-git-bcb4dd60658b3095511cbf2532c5ac102926c672.tar.gz
ensure connection stats emission for write-only connections
We set up a stats timer in the writers but rather emitting any stats directly from there we just get them to 'ping' the reader, which will then emit stats based on its own timer. The timer in the writer is created when a socket operation has been confirmed. a little bit of drive-by refactoring to make implementation easier: - move state creation into one place - move reader into state TODO: suppress all this when in the Erlang client
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_writer.erl64
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2c1eeb9116..829e9e52fe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State)
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
%% Ignore, we will emit a created event once we start running.
mainloop(Deb, State);
+handle_other(ensure_stats, Deb, State) ->
+ mainloop(Deb, ensure_stats_timer(State));
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f3a8cacf15..c23f46f930 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,17 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/5, start_link/5]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol, pending}).
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -67,50 +71,47 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
--spec(mainloop/2 :: (_,_) -> 'done').
--spec(mainloop1/2 :: (_,_) -> any()).
-
-endif.
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
-
-mainloop(ReaderPid, State) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ rabbit_event:init_stats_timer(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
try
- mainloop1(ReaderPid, State)
+ mainloop1(State)
catch
- exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(ReaderPid, State = #wstate{pending = []}) ->
+mainloop1(State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(ReaderPid, State) ->
+mainloop1(State) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(ReaderPid, flush(State))
+ ?MODULE:mainloop1(flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -139,9 +140,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
- State;
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).