diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-01 14:45:10 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2012-11-01 14:45:10 +0000 |
| commit | bcb4dd60658b3095511cbf2532c5ac102926c672 (patch) | |
| tree | fc2aff5c11fb79a372cee05dec9574080d2e6413 /src | |
| parent | 5d7a72f96fee293a263a1ae02d789aa145c1b044 (diff) | |
| download | rabbitmq-server-git-bcb4dd60658b3095511cbf2532c5ac102926c672.tar.gz | |
ensure connection stats emission for write-only connections
We set up a stats timer in the writers but rather emitting any stats
directly from there we just get them to 'ping' the reader, which will
then emit stats based on its own timer.
The timer in the writer is created when a socket operation has been
confirmed.
a little bit of drive-by refactoring to make implementation easier:
- move state creation into one place
- move reader into state
TODO: suppress all this when in the Erlang client
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 64 |
2 files changed, 36 insertions, 30 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2c1eeb9116..829e9e52fe 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -342,6 +342,8 @@ handle_other({'$gen_cast', force_event_refresh}, Deb, State) handle_other({'$gen_cast', force_event_refresh}, Deb, State) -> %% Ignore, we will emit a created event once we start running. mainloop(Deb, State); +handle_other(ensure_stats, Deb, State) -> + mainloop(Deb, ensure_stats_timer(State)); handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index f3a8cacf15..c23f46f930 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -18,13 +18,17 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/5, start_link/5, mainloop/2, mainloop1/2]). +-export([start/5, start_link/5]). -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --record(wstate, {sock, channel, frame_max, protocol, pending}). +%% internal +-export([mainloop/1, mainloop1/1]). + +-record(wstate, {sock, channel, frame_max, protocol, reader, + stats_timer, pending}). -define(HIBERNATE_AFTER, 5000). @@ -67,50 +71,47 @@ non_neg_integer(), rabbit_types:protocol()) -> 'ok'). --spec(mainloop/2 :: (_,_) -> 'done'). --spec(mainloop1/2 :: (_,_) -> any()). - -endif. %%--------------------------------------------------------------------------- start(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid), + {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> - {ok, - proc_lib:spawn_link(?MODULE, mainloop, [ReaderPid, - #wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax, - protocol = Protocol, - pending = []}])}. - -mainloop(ReaderPid, State) -> + State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + +initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid) -> + rabbit_event:init_stats_timer(#wstate{sock = Sock, + channel = Channel, + frame_max = FrameMax, + protocol = Protocol, + reader = ReaderPid, + pending = []}, + #wstate.stats_timer). + +mainloop(State) -> try - mainloop1(ReaderPid, State) + mainloop1(State) catch - exit:Error -> ReaderPid ! {channel_exit, #wstate.channel, Error} + exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, + ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(ReaderPid, State = #wstate{pending = []}) -> +mainloop1(State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after ?HIBERNATE_AFTER -> - erlang:hibernate(?MODULE, mainloop, [ReaderPid, State]) + erlang:hibernate(?MODULE, mainloop, [State]) end; -mainloop1(ReaderPid, State) -> +mainloop1(State) -> receive - Message -> ?MODULE:mainloop1(ReaderPid, handle_message(Message, State)) + Message -> ?MODULE:mainloop1(handle_message(Message, State)) after 0 -> - ?MODULE:mainloop1(ReaderPid, flush(State)) + ?MODULE:mainloop1(flush(State)) end. handle_message({send_command, MethodRecord}, State) -> @@ -139,9 +140,12 @@ handle_message({'DOWN', _MRef, process, QPid, _Reason}, State) -> rabbit_amqqueue:notify_sent_queue_down(QPid), State; handle_message({inet_reply, _, ok}, State) -> - State; + rabbit_event:ensure_stats_timer(State, #wstate.stats_timer, emit_stats); handle_message({inet_reply, _, Status}, _State) -> exit({writer, send_failed, Status}); +handle_message(emit_stats, State = #wstate{reader = ReaderPid}) -> + ReaderPid ! ensure_stats, + rabbit_event:reset_stats_timer(State, #wstate.stats_timer); handle_message(Message, _State) -> exit({writer, message_not_understood, Message}). |
