summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-02 00:04:40 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-02 00:04:40 +0000
commit82edc6c3f97367a3a11103510800ed771800707f (patch)
treef918edc47fee2957ecc01fa7da691f63c2681d45
parent4a6ebfd2fda6dbdbcdcde5e797b98cb688c55a1b (diff)
downloadrabbitmq-server-git-82edc6c3f97367a3a11103510800ed771800707f.tar.gz
further optimise frame reading
Handle complete frames in one go, if possible, rather than header and payload separately. This essentially halves the amount of binary splitting in the framing code. Note that we only do this when the buffer contains just one binary. Tests have shown that attempting to introduce this optimisation when the buffer comprises multiple binaries hurts performance for large messages. That's presumably because we end up constructing larger intermediate binaries.
-rw-r--r--src/rabbit_reader.erl46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 23150040d2..6860254371 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -284,17 +284,15 @@ recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = [B]}) ->
- {Data, Rest} = split_binary(B, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen}));
+recvloop(Deb, State = #v1{buf = [B]}) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)});
recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) ->
{DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []),
Data = list_to_binary(lists:reverse(DataLRev)),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = lists:reverse(RestLRev),
- buf_len = BufLen - RecvLen})).
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev),
+ buf_len = BufLen - RecvLen}).
binlist_split(N, N, L, Acc) ->
{L, Acc};
@@ -714,32 +712,38 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
-
-handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
- handshake({A, B, C, D}, State);
-handle_input(handshake, Other, #v1{sock = Sock}) ->
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
-
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).