summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-08-09 14:28:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-08-09 14:28:00 +0100
commitcd5f6ae651ac95fea95d81173fecc730e139e0e7 (patch)
treeb7763bb87412c6f89e3f9d3908e0afa7fba41a25 /src
parent2f30d659e683f8b9c5a474650478d6d37b2cc3f9 (diff)
parenta667798ec6299fd89fc3ebc6fac8b354b31f68b1 (diff)
downloadrabbitmq-server-git-cd5f6ae651ac95fea95d81173fecc730e139e0e7.tar.gz
merge bug23095 into default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_reader.erl61
1 files changed, 30 insertions, 31 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f947cd9053..fe771bd36b 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -37,7 +37,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/1, mainloop/3]).
+-export([init/1, mainloop/2]).
-export([conserve_memory/2, server_properties/0]).
@@ -59,7 +59,7 @@
%---------------------------------------------------------------------------
--record(v1, {sock, connection, callback, recv_length, recv_ref,
+-record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
connection_state, queue_collector, heartbeater, stats_timer}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
@@ -186,7 +186,7 @@ init(Parent) ->
end.
system_continue(Parent, Deb, State) ->
- ?MODULE:mainloop(Parent, Deb, State).
+ ?MODULE:mainloop(Deb, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -273,22 +273,23 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
ProfilingValue = setup_profiling(),
{ok, Collector} = rabbit_queue_collector:start_link(),
try
- mainloop(Parent, Deb, switch_callback(
- #v1{sock = ClientSock,
- connection = #connection{
- user = none,
- timeout_sec = ?HANDSHAKE_TIMEOUT,
- frame_max = ?FRAME_MIN_SIZE,
- vhost = none,
+ mainloop(Deb, switch_callback(
+ #v1{parent = Parent,
+ sock = ClientSock,
+ connection = #connection{
+ user = none,
+ timeout_sec = ?HANDSHAKE_TIMEOUT,
+ frame_max = ?FRAME_MIN_SIZE,
+ vhost = none,
client_properties = none,
- protocol = none},
- callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
+ protocol = none},
+ callback = uninitialized_callback,
+ recv_length = 0,
+ recv_ref = none,
connection_state = pre_init,
- queue_collector = Collector,
- heartbeater = none,
- stats_timer =
+ queue_collector = Collector,
+ heartbeater = none,
+ stats_timer =
rabbit_event:init_stats_timer()},
handshake, 8))
catch
@@ -314,15 +315,14 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
end,
done.
-mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
+mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
%%?LOGDEBUG("Reader mainloop: ~p bytes available, need ~p~n", [HaveBytes, WaitUntilNBytes]),
receive
{inet_async, Sock, Ref, {ok, Data}} ->
{State1, Callback1, Length1} =
handle_input(State#v1.callback, Data,
State#v1{recv_ref = none}),
- mainloop(Parent, Deb,
- switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, switch_callback(State1, Callback1, Length1));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -332,7 +332,7 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{inet_async, Sock, Ref, {error, Reason}} ->
throw({inet_error, Reason});
{conserve_memory, Conserve} ->
- mainloop(Parent, Deb, internal_conserve_memory(Conserve, State));
+ mainloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -348,16 +348,16 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
{channel_exit, _Chan, E = {writer, send_failed, _Error}} ->
throw(E);
{channel_exit, Channel, Reason} ->
- mainloop(Parent, Deb, handle_channel_exit(Channel, Reason, State));
+ mainloop(Deb, handle_channel_exit(Channel, Reason, State));
{'EXIT', Pid, Reason} ->
- mainloop(Parent, Deb, handle_dependent_exit(Pid, Reason, State));
+ mainloop(Deb, handle_dependent_exit(Pid, Reason, State));
terminate_connection ->
State;
handshake_timeout ->
if ?IS_RUNNING(State) orelse
State#v1.connection_state =:= closing orelse
State#v1.connection_state =:= closed ->
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
true ->
throw({handshake_timeout, State#v1.callback})
end;
@@ -368,22 +368,21 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
gen_server:reply(From, ok),
case ForceTermination of
force -> ok;
- normal -> mainloop(Parent, Deb, NewState)
+ normal -> mainloop(Deb, NewState)
end;
{'$gen_call', From, info} ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Parent, Deb, State);
+ mainloop(Deb, State);
{'$gen_cast', emit_stats} ->
internal_emit_stats(State),
- mainloop(Parent, Deb,
- State#v1{stats_timer =
- rabbit_event:reset_stats_timer_after(
- State#v1.stats_timer)});
+ mainloop(Deb, State#v1{stats_timer =
+ rabbit_event:reset_stats_timer_after(
+ State#v1.stats_timer)});
{system, From, Request} ->
sys:handle_system_msg(Request, From,
Parent, ?MODULE, Deb, State);