diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
| commit | f624fc7704a07b97a67a096740505c69f5c74846 (patch) | |
| tree | 5270d95beb6e370920d5dcee6222857ca99c9fa3 | |
| parent | 9bfc13adb1377bb38156d06e62f32c0688923ed6 (diff) | |
| download | rabbitmq-server-git-f624fc7704a07b97a67a096740505c69f5c74846.tar.gz | |
minor optimisation: ask for next piece of data before processing
This reduces the likelihood of the reader process stalling,
i.e. ending up in 'receive' when there is nothing in the mailbox.
| -rw-r--r-- | src/rabbit_reader.erl | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e68fbca87c..d93a23fe58 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -320,10 +320,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> receive {inet_async, Sock, Ref, {ok, Data}} -> - {State1, Callback1, Length1} = - handle_input(State#v1.callback, Data, - State#v1{recv_ref = none}), - mainloop(Deb, switch_callback(State1, Callback1, Length1)); + mainloop(Deb, handle_input(State#v1.callback, Data, + State#v1{recv_ref = none})); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -626,14 +624,16 @@ analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1}; + ensure_stats_timer( + switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - {handle_frame(Type, Channel, Payload, State), frame_header, 7}; + handle_frame(Type, Channel, Payload, + switch_callback(State, frame_header, 7)); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; @@ -686,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT, - protocol = Protocol}, - connection_state = starting}, - frame_header, 7}. + switch_callback(State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7). refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), |
