summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:29:33 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-07 11:29:33 +0000
commitcd26dc92cdeb002b71f6c9e0d43aed449004be9a (patch)
tree497ccef8d11a38b8dcc7694bf830697c05723b47 /src
parent801e917644cd577db8b5ab5e612a5483e223795e (diff)
parent6d47b7016c44b9c235ab110af19f5506d3c8fc10 (diff)
downloadrabbitmq-server-git-cd26dc92cdeb002b71f6c9e0d43aed449004be9a.tar.gz
Merge in default
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_binary_generator.erl63
-rw-r--r--src/rabbit_binary_parser.erl28
-rw-r--r--src/rabbit_reader.erl144
-rw-r--r--src/rabbit_writer.erl2
5 files changed, 127 insertions, 112 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5c5b79fe61..281aecb93c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -599,6 +599,8 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
+ [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
+ CTag <- ChCTags],
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index ae5bbf51d0..8eaac10d4b 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -119,52 +119,51 @@ create_frame(TypeInt, ChannelInt, Payload) ->
table_field_to_binary({FName, T, V}) ->
[short_string_to_binary(FName) | field_value_to_binary(T, V)].
-field_value_to_binary(longstr, V) -> ["S", long_string_to_binary(V)];
-field_value_to_binary(signedint, V) -> ["I", <<V:32/signed>>];
+field_value_to_binary(longstr, V) -> [$S | long_string_to_binary(V)];
+field_value_to_binary(signedint, V) -> [$I | <<V:32/signed>>];
field_value_to_binary(decimal, V) -> {Before, After} = V,
- ["D", Before, <<After:32>>];
-field_value_to_binary(timestamp, V) -> ["T", <<V:64>>];
-field_value_to_binary(table, V) -> ["F", table_to_binary(V)];
-field_value_to_binary(array, V) -> ["A", array_to_binary(V)];
-field_value_to_binary(byte, V) -> ["b", <<V:8/unsigned>>];
-field_value_to_binary(double, V) -> ["d", <<V:64/float>>];
-field_value_to_binary(float, V) -> ["f", <<V:32/float>>];
-field_value_to_binary(long, V) -> ["l", <<V:64/signed>>];
-field_value_to_binary(short, V) -> ["s", <<V:16/signed>>];
-field_value_to_binary(bool, V) -> ["t", if V -> 1; true -> 0 end];
-field_value_to_binary(binary, V) -> ["x", long_string_to_binary(V)];
-field_value_to_binary(void, _V) -> ["V"].
+ [$D | [Before, <<After:32>>]];
+field_value_to_binary(timestamp, V) -> [$T | <<V:64>>];
+field_value_to_binary(table, V) -> [$F | table_to_binary(V)];
+field_value_to_binary(array, V) -> [$A | array_to_binary(V)];
+field_value_to_binary(byte, V) -> [$b | <<V:8/unsigned>>];
+field_value_to_binary(double, V) -> [$d | <<V:64/float>>];
+field_value_to_binary(float, V) -> [$f | <<V:32/float>>];
+field_value_to_binary(long, V) -> [$l | <<V:64/signed>>];
+field_value_to_binary(short, V) -> [$s | <<V:16/signed>>];
+field_value_to_binary(bool, V) -> [$t | [if V -> 1; true -> 0 end]];
+field_value_to_binary(binary, V) -> [$x | long_string_to_binary(V)];
+field_value_to_binary(void, _V) -> [$V].
table_to_binary(Table) when is_list(Table) ->
- BinTable = generate_table(Table),
- [<<(size(BinTable)):32>>, BinTable].
+ BinTable = generate_table_iolist(Table),
+ [<<(iolist_size(BinTable)):32>> | BinTable].
array_to_binary(Array) when is_list(Array) ->
- BinArray = generate_array(Array),
- [<<(size(BinArray)):32>>, BinArray].
+ BinArray = generate_array_iolist(Array),
+ [<<(iolist_size(BinArray)):32>> | BinArray].
generate_table(Table) when is_list(Table) ->
- list_to_binary(lists:map(fun table_field_to_binary/1, Table)).
+ list_to_binary(generate_table_iolist(Table)).
-generate_array(Array) when is_list(Array) ->
- list_to_binary(lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end,
- Array)).
+generate_table_iolist(Table) ->
+ lists:map(fun table_field_to_binary/1, Table).
+
+generate_array_iolist(Array) ->
+ lists:map(fun ({T, V}) -> field_value_to_binary(T, V) end, Array).
-short_string_to_binary(String) when is_binary(String) ->
- Len = size(String),
- if Len < 256 -> [<<Len:8>>, String];
- true -> exit(content_properties_shortstr_overflow)
- end;
short_string_to_binary(String) ->
- Len = length(String),
- if Len < 256 -> [<<Len:8>>, String];
+ Len = string_length(String),
+ if Len < 256 -> [<<Len:8>> | String];
true -> exit(content_properties_shortstr_overflow)
end.
-long_string_to_binary(String) when is_binary(String) ->
- [<<(size(String)):32>>, String];
long_string_to_binary(String) ->
- [<<(length(String)):32>>, String].
+ Len = string_length(String),
+ [<<Len:32>> | String].
+
+string_length(String) when is_binary(String) -> size(String);
+string_length(String) -> length(String).
check_empty_frame_size() ->
%% Intended to ensure that EMPTY_FRAME_SIZE is defined correctly.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 088ad0e52e..f65d8ea7fd 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -53,35 +53,35 @@ parse_array(<<ValueAndRest/binary>>) ->
{Type, Value, Rest} = parse_field_value(ValueAndRest),
[{Type, Value} | parse_array(Rest)].
-parse_field_value(<<"S", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$S, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{longstr, V, R};
-parse_field_value(<<"I", V:32/signed, R/binary>>) ->
+parse_field_value(<<$I, V:32/signed, R/binary>>) ->
{signedint, V, R};
-parse_field_value(<<"D", Before:8/unsigned, After:32/unsigned, R/binary>>) ->
+parse_field_value(<<$D, Before:8/unsigned, After:32/unsigned, R/binary>>) ->
{decimal, {Before, After}, R};
-parse_field_value(<<"T", V:64/unsigned, R/binary>>) ->
+parse_field_value(<<$T, V:64/unsigned, R/binary>>) ->
{timestamp, V, R};
-parse_field_value(<<"F", VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
+parse_field_value(<<$F, VLen:32/unsigned, Table:VLen/binary, R/binary>>) ->
{table, parse_table(Table), R};
-parse_field_value(<<"A", VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
+parse_field_value(<<$A, VLen:32/unsigned, Array:VLen/binary, R/binary>>) ->
{array, parse_array(Array), R};
-parse_field_value(<<"b", V:8/unsigned, R/binary>>) -> {byte, V, R};
-parse_field_value(<<"d", V:64/float, R/binary>>) -> {double, V, R};
-parse_field_value(<<"f", V:32/float, R/binary>>) -> {float, V, R};
-parse_field_value(<<"l", V:64/signed, R/binary>>) -> {long, V, R};
-parse_field_value(<<"s", V:16/signed, R/binary>>) -> {short, V, R};
-parse_field_value(<<"t", V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
+parse_field_value(<<$b, V:8/unsigned, R/binary>>) -> {byte, V, R};
+parse_field_value(<<$d, V:64/float, R/binary>>) -> {double, V, R};
+parse_field_value(<<$f, V:32/float, R/binary>>) -> {float, V, R};
+parse_field_value(<<$l, V:64/signed, R/binary>>) -> {long, V, R};
+parse_field_value(<<$s, V:16/signed, R/binary>>) -> {short, V, R};
+parse_field_value(<<$t, V:8/unsigned, R/binary>>) -> {bool, (V /= 0), R};
-parse_field_value(<<"x", VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
+parse_field_value(<<$x, VLen:32/unsigned, V:VLen/binary, R/binary>>) ->
{binary, V, R};
-parse_field_value(<<"V", R/binary>>) ->
+parse_field_value(<<$V, R/binary>>) ->
{void, undefined, R}.
ensure_content_decoded(Content = #content{properties = Props})
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4479fc0c3d..d9879f1b57 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/2, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -38,8 +38,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count,
- throttle}).
+ stats_timer, channel_sup_sup_pid, channel_count, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, channel_max, vhost,
@@ -92,9 +91,10 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
--spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
--spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) ->
+ any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
-endif.
@@ -114,8 +114,8 @@ init(Parent, HelperSup) ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
-system_continue(Parent, Deb, State) ->
- mainloop(Deb, State#v1{parent = Parent}).
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -239,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- buf = [],
- buf_len = 0,
channel_count = 0,
throttle = #throttle{
alarmed_by = [],
@@ -249,9 +247,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
blocked_sent = false}},
try
run({?MODULE, recvloop,
- [Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)]}),
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -278,29 +276,38 @@ run({M, F, A}) ->
catch {become, MFA} -> run(MFA)
end.
-recvloop(Deb, State = #v1{pending_recv = true}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{connection_state = blocked}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
- {Data, Rest} = split_binary(case Buf of
- [B] -> B;
- _ -> list_to_binary(lists:reverse(Buf))
- end, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen})).
-
-mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+recvloop(Deb, [B], _BufLen, State) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
+
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
- recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
@@ -311,11 +318,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
- ?MODULE, Deb, State);
+ ?MODULE, Deb, {Buf, BufLen, State});
{other, Other} ->
case handle_other(Other, State) of
stop -> ok;
- NewState -> recvloop(Deb, NewState)
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
end
end.
@@ -715,26 +722,38 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_header, Other});
+handle_input(Callback, Data, _State) ->
+ throw({bad_input, Callback, Data}).
%% The two rules pertaining to version negotiation:
%%
@@ -744,37 +763,31 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
%%
%% * The server MUST provide a protocol version that is lower than or
%% equal to that requested by the client in the protocol header.
-handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+handshake({0, 0, 9, 1}, State) ->
start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
%% This is the protocol header for 0-9, which we can safely treat as
%% though it were 0-9-1.
-handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+handshake({1, 1, 0, 9}, State) ->
start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
%% defines the version as 8-0.
-handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+handshake({1, 1, 8, 0}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
%% The 0-8 spec as on the AMQP web site actually has this as the
%% protocol header; some libraries e.g., py-amqplib, send it when they
%% want 0-8.
-handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+handshake({1, 1, 9, 1}, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
-%% ... and finally, the 1.0 spec is crystal clear! Note that the
-handle_input(handshake, <<"AMQP", Id, 1, 0, 0>>, State) ->
+%% ... and finally, the 1.0 spec is crystal clear!
+handshake({Id, 1, 0, 0}, State) ->
become_1_0(Id, State);
-handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_version, {A, B, C, D}});
-
-handle_input(handshake, Other, #v1{sock = Sock}) ->
- refuse_connection(Sock, {bad_header, Other});
-
-handle_input(Callback, Data, _State) ->
- throw({bad_input, Callback, Data}).
+handshake(Vsn, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, Vsn}).
%% Offer a protocol version to the client. Connection.start only
%% includes a major and minor version number, Luckily 0-9 and 0-9-1
@@ -818,7 +831,7 @@ handle_method0(MethodName, FieldsBin,
try
handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
- catch throw:{writer_inet_error, closed} ->
+ catch throw:{inet_error, closed} ->
maybe_emit_stats(State),
throw(connection_closed_abruptly);
exit:#amqp_error{method = none} = Reason ->
@@ -1126,15 +1139,16 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- throw({become, {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(State)]}})
+ F = fun (_Deb, Buf, BufLen, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
+ end,
+ State = #v1{connection_state = {become, F}}
end.
-pack_for_1_0(#v1{parent = Parent,
- sock = Sock,
- recv_len = RecvLen,
- pending_recv = PendingRecv,
- helper_sup = SupPid,
- buf = Buf,
- buf_len = BufLen}) ->
+pack_for_1_0(Buf, BufLen, #v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ helper_sup = SupPid}) ->
{Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 92d48e633f..34dd3d3b35 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -272,7 +272,7 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
- rabbit_misc:throw_on_error(writer_inet_error,
+ rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
internal_send_command(Sock, Channel, MethodRecord, Protocol) ->