diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-07-23 16:13:04 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-07-23 16:13:04 +0100 |
| commit | 56efe5b06c5b1eabd4a0c599a5e296db0d11ec12 (patch) | |
| tree | 3b283bb0bb516d79e9635244e619251a7ad07905 /src | |
| parent | 2335f5375b0bf39176dfff6bd992faf9e9b89871 (diff) | |
| download | rabbitmq-server-git-56efe5b06c5b1eabd4a0c599a5e296db0d11ec12.tar.gz | |
Wire in support for system messages.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_writer.erl | 46 |
1 files changed, 36 insertions, 10 deletions
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 2d15e6a2ef..3e966e16eb 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -19,6 +19,9 @@ -include("rabbit_framing.hrl"). -export([start/5, start_link/5, start/6, start_link/6]). + +-export([system_continue/3, system_terminate/4, system_code_change/4]). + -export([send_command/2, send_command/3, send_command_sync/2, send_command_sync/3, send_command_and_notify/4, send_command_and_notify/5, @@ -26,7 +29,7 @@ -export([internal_send_command/4, internal_send_command/6]). %% internal --export([mainloop/1, mainloop1/1]). +-export([mainloop/2, mainloop1/2]). -record(wstate, {sock, channel, frame_max, protocol, reader, stats_timer, pending}). @@ -53,6 +56,11 @@ (rabbit_net:socket(), rabbit_channel:channel_number(), non_neg_integer(), rabbit_types:protocol(), pid(), boolean()) -> rabbit_types:ok(pid())). + +-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). +-spec(system_continue/3 :: (_,_,#wstate{}) -> any()). +-spec(system_terminate/4 :: (_,_,_,_) -> none()). + -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -94,12 +102,14 @@ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) -> start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}. start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats), - {ok, proc_lib:spawn_link(?MODULE, mainloop, [State])}. + Deb = sys:debug_options([]), + {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}. initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> (case ReaderWantsStats of @@ -113,28 +123,44 @@ initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) -> pending = []}, #wstate.stats_timer). -mainloop(State) -> +system_continue(Parent, Deb, State) -> + ?MODULE:mainloop(Deb, State#wstate{reader = Parent}). + +system_terminate(Reason, _Parent, _Deb, _State) -> + exit(Reason). + +system_code_change(Misc, _Module, _OldVsn, _Extra) -> + {ok, Misc}. + +mainloop(Deb, State) -> try - mainloop1(State) + mainloop1(Deb, State) catch exit:Error -> #wstate{reader = ReaderPid, channel = Channel} = State, ReaderPid ! {channel_exit, Channel, Error} end, done. -mainloop1(State = #wstate{pending = []}) -> +mainloop1(Deb, State = #wstate{pending = []}) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after ?HIBERNATE_AFTER -> erlang:hibernate(?MODULE, mainloop, [State]) end; -mainloop1(State) -> +mainloop1(Deb, State) -> receive - Message -> ?MODULE:mainloop1(handle_message(Message, State)) + Message -> {Deb1, State1} = handle_message(Deb, Message, State), + ?MODULE:mainloop1(Deb1, State1) after 0 -> - ?MODULE:mainloop1(internal_flush(State)) + ?MODULE:mainloop1(Deb, internal_flush(State)) end. +handle_message(Deb, {system, From, Req}, State = #wstate{reader = Parent}) -> + sys:handle_system_msg(Req, From, Parent, ?MODULE, Deb, State); +handle_message(Deb, Message, State) -> + {Deb, handle_message(Message, State)}. + handle_message({send_command, MethodRecord}, State) -> internal_send_command_async(MethodRecord, State); handle_message({send_command, MethodRecord, Content}, State) -> |
