summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-04-07 13:01:17 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-04-07 13:01:17 +0100
commit92654d7c845f60dd339e03cca7ccef0c35688976 (patch)
tree12f9ab57a99e15965cf6253267047478a4e1ef86
parent482d29e30d243ae235ca14b81fc7579ca02a6f85 (diff)
downloadrabbitmq-server-git-92654d7c845f60dd339e03cca7ccef0c35688976.tar.gz
get heartbeat monitor to pause when it should
On 'default' we kick off another prim_inet:async_recv *before* handling the frame we've just received. This is done for performance reasons - essentially we are reading ahead - and leads to the following sequence of events: 1. receive memory alarm -> change state to 'blocking' 2. receive a 'publish' method frame 3. kick off another prim_inet:async_recv 4. handle frame, detecting that it is a 'publish' frame and thus changing the state to 'blocked' 5. receive the frame header for another frame (e.g. the message header, or could be something on another channel, or a heartbeat) 6. since the state is 'blocked' and we pause the heartbeat monitor and *don't* kick off another prim_inet:async_recv On this branch we don't read ahead since a) that would complicate the logic a fair bit, and b) we could end up draining a fair chunk of data from the socket, rather than just a frame header. As a result we need to make sure the heartbeat monitor gets paused as soon as we transition to the 'blocked' state.
-rw-r--r--src/rabbit_reader.erl4
1 files changed, 2 insertions, 2 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 792102681a..e210dba182 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -521,8 +521,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize},
PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- handle_frame(Type, Channel, Payload,
- switch_callback(State, frame_header, 7));
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7);
_ ->
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;