diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-09 14:28:00 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-08-09 14:28:00 +0100 |
| commit | cd5f6ae651ac95fea95d81173fecc730e139e0e7 (patch) | |
| tree | b7763bb87412c6f89e3f9d3908e0afa7fba41a25 /src | |
| parent | 2f30d659e683f8b9c5a474650478d6d37b2cc3f9 (diff) | |
| parent | a667798ec6299fd89fc3ebc6fac8b354b31f68b1 (diff) | |
| download | rabbitmq-server-git-cd5f6ae651ac95fea95d81173fecc730e139e0e7.tar.gz | |
merge bug23095 into default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_reader.erl | 61 |
1 files changed, 30 insertions, 31 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f947cd9053..fe771bd36b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -37,7 +37,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/1, mainloop/3]). +-export([init/1, mainloop/2]). -export([conserve_memory/2, server_properties/0]). @@ -59,7 +59,7 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_length, recv_ref, +-record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, @@ -186,7 +186,7 @@ init(Parent) -> end. system_continue(Parent, Deb, State) -> - ?MODULE:mainloop(Parent, Deb, State). + ?MODULE:mainloop(Deb, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -273,22 +273,23 @@ start_connection(Parent, Deb, Sock, SockTransform) -> ProfilingValue = setup_profiling(), {ok, Collector} = rabbit_queue_collector:start_link(), try - mainloop(Parent, Deb, switch_callback( - #v1{sock = ClientSock, - connection = #connection{ - user = none, - timeout_sec = ?HANDSHAKE_TIMEOUT, - frame_max = ?FRAME_MIN_SIZE, - vhost = none, + mainloop(Deb, switch_callback( + #v1{parent = Parent, + sock = ClientSock, + connection = #connection{ + user = none, + timeout_sec = ?HANDSHAKE_TIMEOUT, + frame_max = ?FRAME_MIN_SIZE, + vhost = none, client_properties = none, - protocol = none}, - callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, + protocol = none}, + callback = uninitialized_callback, + recv_length = 0, + recv_ref = none, connection_state = pre_init, - queue_collector = Collector, - heartbeater = none, - stats_timer = + queue_collector = Collector, + heartbeater = none, + stats_timer = rabbit_event:init_stats_timer()}, handshake, 8)) catch @@ -314,15 +315,14 @@ start_connection(Parent, Deb, Sock, SockTransform) -> end, done. -mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> +mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> %%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]), receive {inet_async, Sock, Ref, {ok, Data}} -> {State1, Callback1, Length1} = handle_input(State#v1.callback, Data, State#v1{recv_ref = none}), - mainloop(Parent, Deb, - switch_callback(State1, Callback1, Length1)); + mainloop(Deb, switch_callback(State1, Callback1, Length1)); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -332,7 +332,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {inet_async, Sock, Ref, {error, Reason}} -> throw({inet_error, Reason}); {conserve_memory, Conserve} -> - mainloop(Parent, Deb, internal_conserve_memory(Conserve, State)); + mainloop(Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -348,16 +348,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> {channel_exit, _Chan, E = {writer, send_failed, _Error}} -> throw(E); {channel_exit, Channel, Reason} -> - mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State)); + mainloop(Deb, handle_channel_exit(Channel, Reason, State)); {'EXIT', Pid, Reason} -> - mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State)); + mainloop(Deb, handle_dependent_exit(Pid, Reason, State)); terminate_connection -> State; handshake_timeout -> if ?IS_RUNNING(State) orelse State#v1.connection_state =:= closing orelse State#v1.connection_state =:= closed -> - mainloop(Parent, Deb, State); + mainloop(Deb, State); true -> throw({handshake_timeout, State#v1.callback}) end; @@ -368,22 +368,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> gen_server:reply(From, ok), case ForceTermination of force -> ok; - normal -> mainloop(Parent, Deb, NewState) + normal -> mainloop(Deb, NewState) end; {'$gen_call', From, info} -> gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {'$gen_call', From, {info, Items}} -> gen_server:reply(From, try {ok, infos(Items, State)} catch Error -> {error, Error} end), - mainloop(Parent, Deb, State); + mainloop(Deb, State); {'$gen_cast', emit_stats} -> internal_emit_stats(State), - mainloop(Parent, Deb, - State#v1{stats_timer = - rabbit_event:reset_stats_timer_after( - State#v1.stats_timer)}); + mainloop(Deb, State#v1{stats_timer = + rabbit_event:reset_stats_timer_after( + State#v1.stats_timer)}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); |
