diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 13:01:17 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 13:01:17 +0100 |
| commit | 92654d7c845f60dd339e03cca7ccef0c35688976 (patch) | |
| tree | 12f9ab57a99e15965cf6253267047478a4e1ef86 | |
| parent | 482d29e30d243ae235ca14b81fc7579ca02a6f85 (diff) | |
| download | rabbitmq-server-git-92654d7c845f60dd339e03cca7ccef0c35688976.tar.gz | |
get heartbeat monitor to pause when it should
On 'default' we kick off another prim_inet:async_recv *before*
handling the frame we've just received. This is done for performance
reasons - essentially we are reading ahead - and leads to the following
sequence of events:
1. receive memory alarm -> change state to 'blocking'
2. receive a 'publish' method frame
3. kick off another prim_inet:async_recv
4. handle frame, detecting that it is a 'publish' frame and thus
changing the state to 'blocked'
5. receive the frame header for another frame (e.g. the message
header, or could be something on another channel, or a heartbeat)
6. since the state is 'blocked' and we pause the heartbeat
monitor and *don't* kick off another prim_inet:async_recv
On this branch we don't read ahead since a) that would complicate the
logic a fair bit, and b) we could end up draining a fair chunk of data
from the socket, rather than just a frame header. As a result we need
to make sure the heartbeat monitor gets paused as soon as we
transition to the 'blocked' state.
| -rw-r--r-- | src/rabbit_reader.erl | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 792102681a..e210dba182 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -521,8 +521,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - handle_frame(Type, Channel, Payload, - switch_callback(State, frame_header, 7)); + switch_callback(handle_frame(Type, Channel, Payload, State), + frame_header, 7); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; |
