diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:29:33 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 11:29:33 +0000 |
| commit | cd26dc92cdeb002b71f6c9e0d43aed449004be9a (patch) | |
| tree | 497ccef8d11a38b8dcc7694bf830697c05723b47 | |
| parent | 801e917644cd577db8b5ab5e612a5483e223795e (diff) | |
| parent | 6d47b7016c44b9c235ab110af19f5506d3c8fc10 (diff) | |
| download | rabbitmq-server-git-cd26dc92cdeb002b71f6c9e0d43aed449004be9a.tar.gz | |
Merge in default
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 63 | ||||
| -rw-r--r-- | src/rabbit_binary_parser.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 144 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 2 |
5 files changed, 127 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5c5b79fe61..281aecb93c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -599,6 +599,8 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end, State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, + [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || + CTag <- ChCTags], case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(ChAckTags, diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index ae5bbf51d0..8eaac10d4b 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) -> table_field_to_binary({FName, T, V}) -> [short_string_to_binary(FName) | field_value_to_binary(T, V)]. -field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)]; -field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>]; +field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)]; +field_value_to_binary(signedint, V) -> [$I | <<V:32/signed>>]; field_value_to_binary(decimal, V) -> {Before, After} = V, - ["D", Before, <<After:32>>]; -field_value_to_binary(timestamp, V) -> ["T", <<V:64>>]; -field_value_to_binary(table, V) -> ["F", table_to_binary(V)]; -field_value_to_binary(array, V) -> ["A", array_to_binary(V)]; -field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>]; -field_value_to_binary(double, V) -> ["d", <<V:64/float>>]; -field_value_to_binary(float, V) -> ["f", <<V:32/float>>]; -field_value_to_binary(long, V) -> ["l", <<V:64/signed>>]; -field_value_to_binary(short, V) -> ["s", <<V:16/signed>>]; -field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end]; -field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)]; -field_value_to_binary(void, _V) -> ["V"]. + [$D | [Before, <<After:32>>]]; +field_value_to_binary(timestamp, V) -> [$T | <<V:64>>]; +field_value_to_binary(table, V) -> [$F | table_to_binary(V)]; +field_value_to_binary(array, V) -> [$A | array_to_binary(V)]; +field_value_to_binary(byte, V) -> [$b | <<V:8/unsigned>>]; +field_value_to_binary(double, V) -> [$d | <<V:64/float>>]; +field_value_to_binary(float, V) -> [$f | <<V:32/float>>]; +field_value_to_binary(long, V) -> [$l | <<V:64/signed>>]; +field_value_to_binary(short, V) -> [$s | <<V:16/signed>>]; +field_value_to_binary(bool, V) -> [$t | [if V -> 1; true -> 0 end]]; +field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)]; +field_value_to_binary(void, _V) -> [$V]. table_to_binary(Table) when is_list(Table) -> - BinTable = generate_table(Table), - [<<(size(BinTable)):32>>, BinTable]. + BinTable = generate_table_iolist(Table), + [<<(iolist_size(BinTable)):32>> | BinTable]. array_to_binary(Array) when is_list(Array) -> - BinArray = generate_array(Array), - [<<(size(BinArray)):32>>, BinArray]. + BinArray = generate_array_iolist(Array), + [<<(iolist_size(BinArray)):32>> | BinArray]. generate_table(Table) when is_list(Table) -> - list_to_binary(lists:map(fun table_field_to_binary/1, Table)). + list_to_binary(generate_table_iolist(Table)). -generate_array(Array) when is_list(Array) -> - list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, - Array)). +generate_table_iolist(Table) -> + lists:map(fun table_field_to_binary/1, Table). + +generate_array_iolist(Array) -> + lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array). -short_string_to_binary(String) when is_binary(String) -> - Len = size(String), - if Len < 256 -> [<<Len:8>>, String]; - true -> exit(content_properties_shortstr_overflow) - end; short_string_to_binary(String) -> - Len = length(String), - if Len < 256 -> [<<Len:8>>, String]; + Len = string_length(String), + if Len < 256 -> [<<Len:8>> | String]; true -> exit(content_properties_shortstr_overflow) end. -long_string_to_binary(String) when is_binary(String) -> - [<<(size(String)):32>>, String]; long_string_to_binary(String) -> - [<<(length(String)):32>>, String]. + Len = string_length(String), + [<<Len:32>> | String]. + +string_length(String) when is_binary(String) -> size(String); +string_length(String) -> length(String). check_empty_frame_size() -> %% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 088ad0e52e..f65d8ea7fd 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -53,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) -> {Type, Value, Rest} = parse_field_value(ValueAndRest), [{Type, Value} | parse_array(Rest)]. -parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {longstr, V, R}; -parse_field_value(<<"I", V:32/signed, R/binary>>) -> +parse_field_value(<<$I, V:32/signed, R/binary>>) -> {signedint, V, R}; -parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) -> +parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) -> {decimal, {Before, After}, R}; -parse_field_value(<<"T", V:64/unsigned, R/binary>>) -> +parse_field_value(<<$T, V:64/unsigned, R/binary>>) -> {timestamp, V, R}; -parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> +parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) -> {table, parse_table(Table), R}; -parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> +parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) -> {array, parse_array(Array), R}; -parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R}; -parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R}; -parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R}; -parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R}; -parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R}; -parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; +parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R}; +parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R}; +parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R}; +parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R}; +parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R}; +parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R}; -parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) -> +parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) -> {binary, V, R}; -parse_field_value(<<"V", R/binary>>) -> +parse_field_value(<<$V, R/binary>>) -> {void, undefined, R}. ensure_content_decoded(Content = #content{properties = Props}) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4479fc0c3d..d9879f1b57 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2, recvloop/2]). +-export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -38,8 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count, - throttle}). + stats_timer, channel_sup_sup_pid, channel_count, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, @@ -92,9 +91,10 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). --spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). --spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> + any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. @@ -114,8 +114,8 @@ init(Parent, HelperSup) -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - mainloop(Deb, State#v1{parent = Parent}). +system_continue(Parent, Deb, {Buf, BufLen, State}) -> + mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -239,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - buf = [], - buf_len = 0, channel_count = 0, throttle = #throttle{ alarmed_by = [], @@ -249,9 +247,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> blocked_sent = false}}, try run({?MODULE, recvloop, - [Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)]}), + [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -278,29 +276,38 @@ run({M, F, A}) -> catch {become, MFA} -> run(MFA) end. -recvloop(Deb, State = #v1{pending_recv = true}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = blocked}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) +recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, Buf, BufLen, State)}); +recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> - {Data, Rest} = split_binary(case Buf of - [B] -> B; - _ -> list_to_binary(lists:reverse(Buf)) - end, RecvLen), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = [Rest], - buf_len = BufLen - RecvLen})). - -mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> + mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); +recvloop(Deb, [B], _BufLen, State) -> + {Rest, State1} = handle_input(State#v1.callback, B, State), + recvloop(Deb, [Rest], size(Rest), State1); +recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> + {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), + Data = list_to_binary(lists:reverse(DataLRev)), + {<<>>, State1} = handle_input(State#v1.callback, Data, State), + recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). + +binlist_split(0, L, Acc) -> + {L, Acc}; +binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> + {H, T} = split_binary(Acc0, -Len), + {[H|L], [T|Acc]}; +binlist_split(Len, [H|T], Acc) -> + binlist_split(Len - size(H), T, [H|Acc]). + +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + recvloop(Deb, [Data | Buf], BufLen + size(Data), + State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -311,11 +318,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> throw({inet_error, Reason}); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, - ?MODULE, Deb, State); + ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; - NewState -> recvloop(Deb, NewState) + NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. @@ -715,26 +722,38 @@ post_process_frame(_Frame, _ChPid, State) -> %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - ensure_stats_timer( - switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1)); - +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, + Payload:PayloadSize/binary, ?FRAME_END, + Rest/binary>>, + State) -> + {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(State, + {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> - <<Payload:PayloadSize/binary, EndMarker>> = Data, + <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), - switch_callback(State1, frame_header, 7); + {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; +handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> + {Rest, handshake({A, B, C, D}, State)}; +handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_header, Other}); +handle_input(Callback, Data, _State) -> + throw({bad_input, Callback, Data}). %% The two rules pertaining to version negotiation: %% @@ -744,37 +763,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> %% %% * The server MUST provide a protocol version that is lower than or %% equal to that requested by the client in the protocol header. -handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> +handshake({0, 0, 9, 1}, State) -> start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); %% This is the protocol header for 0-9, which we can safely treat as %% though it were 0-9-1. -handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> +handshake({1, 1, 0, 9}, State) -> start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); %% This is what most clients send for 0-8. The 0-8 spec, confusingly, %% defines the version as 8-0. -handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> +handshake({1, 1, 8, 0}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); %% The 0-8 spec as on the AMQP web site actually has this as the %% protocol header; some libraries e.g., py-amqplib, send it when they %% want 0-8. -handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> +handshake({1, 1, 9, 1}, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); -%% ... and finally, the 1.0 spec is crystal clear! Note that the -handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) -> +%% ... and finally, the 1.0 spec is crystal clear! +handshake({Id, 1, 0, 0}, State) -> become_1_0(Id, State); -handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_version, {A, B, C, D}}); - -handle_input(handshake, Other, #v1{sock = Sock}) -> - refuse_connection(Sock, {bad_header, Other}); - -handle_input(Callback, Data, _State) -> - throw({bad_input, Callback, Data}). +handshake(Vsn, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, Vsn}). %% Offer a protocol version to the client. Connection.start only %% includes a major and minor version number, Luckily 0-9 and 0-9-1 @@ -818,7 +831,7 @@ handle_method0(MethodName, FieldsBin, try handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) - catch throw:{writer_inet_error, closed} -> + catch throw:{inet_error, closed} -> maybe_emit_stats(State), throw(connection_closed_abruptly); exit:#amqp_error{method = none} = Reason -> @@ -1126,15 +1139,16 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - throw({become, {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(State)]}}) + F = fun (_Deb, Buf, BufLen, S) -> + {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(Buf, BufLen, S)]} + end, + State = #v1{connection_state = {become, F}} end. -pack_for_1_0(#v1{parent = Parent, - sock = Sock, - recv_len = RecvLen, - pending_recv = PendingRecv, - helper_sup = SupPid, - buf = Buf, - buf_len = BufLen}) -> +pack_for_1_0(Buf, BufLen, #v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 92d48e633f..34dd3d3b35 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -272,7 +272,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> - rabbit_misc:throw_on_error(writer_inet_error, + rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord, Protocol) -> |
