summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-07-23 16:13:04 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-07-23 16:13:04 +0100
commit56efe5b06c5b1eabd4a0c599a5e296db0d11ec12 (patch)
tree3b283bb0bb516d79e9635244e619251a7ad07905 /src
parent2335f5375b0bf39176dfff6bd992faf9e9b89871 (diff)
downloadrabbitmq-server-git-56efe5b06c5b1eabd4a0c599a5e296db0d11ec12.tar.gz
Wire in support for system messages.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_writer.erl46
1 files changed, 36 insertions, 10 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 2d15e6a2ef..3e966e16eb 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -19,6 +19,9 @@
-include("rabbit_framing.hrl").
-export([start/5, start_link/5, start/6, start_link/6]).
+
+-export([system_continue/3, system_terminate/4, system_code_change/4]).
+
-export([send_command/2, send_command/3,
send_command_sync/2, send_command_sync/3,
send_command_and_notify/4, send_command_and_notify/5,
@@ -26,7 +29,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/1, mainloop1/1]).
+-export([mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -53,6 +56,11 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
-> rabbit_types:ok(pid())).
+
+-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
+-spec(system_continue/3 :: (_,_,#wstate{}) -> any()).
+-spec(system_terminate/4 :: (_,_,_,_) -> none()).
+
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -94,12 +102,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}.
+ Deb = sys:debug_options([]),
+ {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -113,28 +123,44 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
pending = []},
#wstate.stats_timer).
-mainloop(State) ->
+system_continue(Parent, Deb, State) ->
+ ?MODULE:mainloop(Deb, State#wstate{reader = Parent}).
+
+system_terminate(Reason, _Parent, _Deb, _State) ->
+ exit(Reason).
+
+system_code_change(Misc, _Module, _OldVsn, _Extra) ->
+ {ok, Misc}.
+
+mainloop(Deb, State) ->
try
- mainloop1(State)
+ mainloop1(Deb, State)
catch
exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State,
ReaderPid ! {channel_exit, Channel, Error}
end,
done.
-mainloop1(State = #wstate{pending = []}) ->
+mainloop1(Deb, State = #wstate{pending = []}) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after ?HIBERNATE_AFTER ->
erlang:hibernate(?MODULE, mainloop, [State])
end;
-mainloop1(State) ->
+mainloop1(Deb, State) ->
receive
- Message -> ?MODULE:mainloop1(handle_message(Message, State))
+ Message -> {Deb1, State1} = handle_message(Deb, Message, State),
+ ?MODULE:mainloop1(Deb1, State1)
after 0 ->
- ?MODULE:mainloop1(internal_flush(State))
+ ?MODULE:mainloop1(Deb, internal_flush(State))
end.
+handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) ->
+ sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State);
+handle_message(Deb, Message, State) ->
+ {Deb, handle_message(Message, State)}.
+
handle_message({send_command, MethodRecord}, State) ->
internal_send_command_async(MethodRecord, State);
handle_message({send_command, MethodRecord, Content}, State) ->