diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
| commit | f624fc7704a07b97a67a096740505c69f5c74846 (patch) | |
| tree | 5270d95beb6e370920d5dcee6222857ca99c9fa3 /src | |
| parent | 9bfc13adb1377bb38156d06e62f32c0688923ed6 (diff) | |
| download | rabbitmq-server-git-f624fc7704a07b97a67a096740505c69f5c74846.tar.gz | |
minor optimisation: ask for next piece of data before processing
This reduces the likelihood of the reader process stalling,
i.e. ending up in 'receive' when there is nothing in the mailbox.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_reader.erl | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e68fbca87c..d93a23fe58 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -320,10 +320,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> receive {inet_async, Sock, Ref, {ok, Data}} -> - {State1, Callback1, Length1} = - handle_input(State#v1.callback, Data, - State#v1{recv_ref = none}), - mainloop(Deb, switch_callback(State1, Callback1, Length1)); + mainloop(Deb, handle_input(State#v1.callback, Data, + State#v1{recv_ref = none})); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -626,14 +624,16 @@ analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1}; + ensure_stats_timer( + switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - {handle_frame(Type, Channel, Payload, State), frame_header, 7}; + handle_frame(Type, Channel, Payload, + switch_callback(State, frame_header, 7)); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; @@ -686,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT, - protocol = Protocol}, - connection_state = starting}, - frame_header, 7}. + switch_callback(State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7). refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), |
