summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_writer.erl64
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 2c1eeb9116..829e9e52fe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State)
handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
%% Ignore, we will emit a created event once we start running.
mainloop(Deb, State);
+handle_other(ensure_stats, Deb, State) ->
+ mainloop(Deb, ensure_stats_timer(State));
handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index f3a8cacf15..c23f46f930 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,13 +18,17 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, mainloop/2, mainloop1/2]).
+-export([start/5, start_link/5]).
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5]).
-export([internal_send_command/4, internal_send_command/6]).
--record(wstate, {sock, channel, frame_max, protocol, pending}).
+%% internal
+-export([mainloop/1, mainloop1/1]).
+
+-record(wstate, {sock, channel, frame_max, protocol, reader,
+ stats_timer, pending}).
-define(HIBERNATE_AFTER, 5000).
@@ -67,50 +71,47 @@
non_neg_integer(), rabbit_types:protocol())
-> 'ok').
--spec(mainloop/2 :: (_,_) -> 'done').
--spec(mainloop1/2 :: (_,_) -> any()).
-
-endif.
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- {ok,
- proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid,
- #wstate{sock = Sock,
- channel = Channel,
- frame_max = FrameMax,
- protocol = Protocol,
- pending = []}])}.
-
-mainloop(ReaderPid, State) ->
+ State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+
+initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
+ rabbit_event:init_stats_timer(#wstate{sock = Sock,
+ channel = Channel,
+ frame_max = FrameMax,
+ protocol = Protocol,
+ reader = ReaderPid,
+ pending = []},
+ #wstate.stats_timer).
+
+mainloop(State) ->
try
- mainloop1(ReaderPid, State)
+ mainloop1(State)
catch
- exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error}
+ exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
+ ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(ReaderPid, State = #wstate{pending = []}) ->
+mainloop1(State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after ?HIBERNATE_AFTER ->
- erlang:hibernate(?MODULE, mainloop, [ReaderPid, State])
+ erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(ReaderPid, State) ->
+mainloop1(State) ->
receive
- Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State))
+ Message -> ?MODULE:mainloop1(handle_message(Message, State))
after 0 ->
- ?MODULE:mainloop1(ReaderPid, flush(State))
+ ?MODULE:mainloop1(flush(State))
end.
handle_message({send_command, MethodRecord}, State) ->
@@ -139,9 +140,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) ->
rabbit_amqqueue:notify_sent_queue_down(QPid),
State;
handle_message({inet_reply, _, ok}, State) ->
- State;
+ rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats);
handle_message({inet_reply, _, Status}, _State) ->
exit({writer, send_failed, Status});
+handle_message(emit_stats, State = #wstate{reader = ReaderPid}) ->
+ ReaderPid ! ensure_stats,
+ rabbit_event:reset_stats_timer(State, #wstate.stats_timer);
handle_message(Message, _State) ->
exit({writer, message_not_understood, Message}).