diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_binary_generator.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 92 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 80 |
4 files changed, 117 insertions, 92 deletions
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index cd8b44d15f..e29f9fcbc7 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -41,7 +41,7 @@ % See definition of check_empty_content_body_frame_size/0, an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). --export([build_simple_method_frame/2, +-export([build_simple_method_frame/3, build_simple_content_frames/3, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). @@ -56,8 +56,8 @@ -type(frame() :: [binary()]). --spec(build_simple_method_frame/2 :: - (channel_number(), amqp_method_record()) -> frame()). +-spec(build_simple_method_frame/3 :: + (channel_number(), amqp_method_record(), protocol()) -> frame()). -spec(build_simple_content_frames/3 :: (channel_number(), content(), non_neg_integer()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). @@ -71,17 +71,18 @@ %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord) -> +build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> MethodFields = rabbit_framing:encode_method_fields(MethodRecord), - MethodName = adjust_close(rabbit_misc:method_record_type(MethodRecord)), + MethodName = adjust_close(rabbit_misc:method_record_type(MethodRecord), + Protocol), {ClassId, MethodId} = rabbit_framing:method_id(MethodName), create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). -adjust_close('connection.close') -> +adjust_close('connection.close', protocol_08) -> 'connection.close08'; -adjust_close('connection.close_ok') -> +adjust_close('connection.close_ok', protocol_08) -> 'connection.close08_ok'; -adjust_close(MethodName) -> +adjust_close(MethodName, _Protocol) -> MethodName. build_simple_content_frames(ChannelInt, diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 490435835d..90856cdf3f 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,20 +32,20 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/2, process/2, shutdown/1]). +-export([start_link/3, process/2, shutdown/1]). %% internal --export([mainloop/1]). +-export([mainloop/2]). %%-------------------------------------------------------------------- -start_link(StartFun, StartArgs) -> +start_link(StartFun, StartArgs, Protocol) -> spawn_link( fun () -> %% we trap exits so that a normal termination of the %% channel or reader process terminates us too. process_flag(trap_exit, true), - mainloop(apply(StartFun, StartArgs)) + mainloop(apply(StartFun, StartArgs), Protocol) end). process(Pid, Frame) -> @@ -72,30 +72,30 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid) -> +mainloop(ChannelPid, Protocol) -> {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = decode_method_fields(MethodName, FieldsBin), + Method = decode_method_fields(MethodName, FieldsBin, Protocol), case rabbit_framing:method_has_content(MethodName) of true -> rabbit_channel:do(ChannelPid, Method, collect_content(ChannelPid, MethodName)); false -> rabbit_channel:do(ChannelPid, Method) end, - ?MODULE:mainloop(ChannelPid). + ?MODULE:mainloop(ChannelPid, Protocol). %% Handle 0-8 version of channel.tune-ok. In 0-9-1 it gained a longstr %% "deprecated_channel_id". -decode_method_fields('channel.tune_ok', FieldsBin) -> +decode_method_fields('channel.tune_ok', FieldsBin, protocol_08) -> Len = 0, rabbit_framing:decode_method_fields( 'channel.tune_ok', <<FieldsBin/binary, Len:32/unsigned>>); %% Handle 0-8 version of basic.consume. In 0-9-1 it gained a table %% "filter". -decode_method_fields('basic.consume', FieldsBin) -> +decode_method_fields('basic.consume', FieldsBin, protocol_08) -> T = rabbit_binary_generator:generate_table([]), TLen = size(T), rabbit_framing:decode_method_fields( 'basic.consume', <<FieldsBin/binary, TLen:32/unsigned, T:TLen/binary>>); -decode_method_fields(MethodName, FieldsBin) -> +decode_method_fields(MethodName, FieldsBin, _Protocol) -> rabbit_framing:decode_method_fields(MethodName, FieldsBin). collect_content(ChannelPid, MethodName) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index f83f932db9..e2d4880b94 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,8 +41,6 @@ -export([server_properties/0]). --export([analyze_frame/2]). - -import(gen_tcp). -import(fprof). -import(inet). @@ -244,7 +242,8 @@ start_connection(Parent, Deb, Sock, SockTransform) -> timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, vhost = none, - client_properties = none}, + client_properties = none, + protocol = unknown}, callback = uninitialized_callback, recv_ref = none, connection_state = pre_init, @@ -432,7 +431,8 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + queue_collector = Collector, + connection = #connection{protocol = Protocol}}) -> case all_channels() of [] -> %% Spec says "Exclusive queues may only be accessed by the current @@ -440,16 +440,19 @@ maybe_close(State = #v1{connection_state = closing, %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. rabbit_reader_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}, + Protocol), close_connection(State); _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -457,16 +460,18 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> @@ -501,24 +506,28 @@ handle_frame(Type, Channel, Payload, State) -> end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, adjust_close(rabbit_framing:lookup_method_name({ClassId, - MethodId})), +analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + {method, adjust_close( + rabbit_framing:lookup_method_name({ClassId, MethodId}), + Protocol), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. -adjust_close('connection.close08') -> +adjust_close('connection.close08', protocol_08) -> 'connection.close'; -adjust_close('connection.close08_ok') -> +adjust_close('connection.close08_ok', protocol_08) -> 'connection.close_ok'; -adjust_close(MethodName) -> +adjust_close(MethodName, _Protocol) -> MethodName. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -540,6 +549,7 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, case check_version({ProtocolMajor, ProtocolMinor}, {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of true -> + Protocol = protocol_08, ok = send_on_channel0( Sock, #'connection.start'{ @@ -547,9 +557,11 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, version_minor = ?PROTOCOL_VERSION_MINOR, server_properties = server_properties(), mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), + locales = <<"en_US">> }, + Protocol), {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, connection_state = starting}, frame_header, 7}; false -> @@ -604,7 +616,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), ok = send_on_channel0( @@ -612,7 +625,8 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, #'connection.tune'{channel_max = 0, %% set to zero once QPid fix their negotiation frame_max = 131072, - heartbeat = 0}), + heartbeat = 0}, + Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, @@ -634,13 +648,15 @@ handle_method0(#'connection.tune_ok'{channel_max = _ChannelMax, handle_method0(#'connection.open'{virtual_host = VHostPath}, State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0( Sock, - #'connection.open_ok'{deprecated_known_hosts = <<>>}), + #'connection.open_ok'{deprecated_known_hosts = <<>>}, + Protocol), State#v1{connection_state = running, connection = NewConnection}; handle_method0(#'connection.close'{}, @@ -658,8 +674,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -711,16 +727,19 @@ i(Item, #v1{}) -> %%-------------------------------------------------------------------------- -send_to_new_channel(Channel, AnalyzedFrame, - State = #v1{queue_collector = Collector}) -> +send_to_new_channel(Channel, AnalyzedFrame, State = + #v1{queue_collector = Collector, + connection = #connection{protocol = Protocol}}) -> #v1{sock = Sock, connection = #connection{ frame_max = FrameMax, user = #user{username = Username}, - vhost = VHost}} = State, - WriterPid = rabbit_writer:start(Sock, Channel, FrameMax), + vhost = VHost, + protocol = Protocol}} = State, + WriterPid = rabbit_writer:start(Sock, Channel, FrameMax, Protocol), ChPid = rabbit_framing_channel:start_link( fun rabbit_channel:start_link/6, - [Channel, self(), WriterPid, Username, VHost, Collector]), + [Channel, self(), WriterPid, Username, VHost, Collector], + Protocol), put({channel, Channel}, {chpid, ChPid}), put({chpid, ChPid}, {channel, Channel}), ok = rabbit_framing_channel:process(ChPid, AnalyzedFrame). @@ -736,7 +755,8 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), NewState = case ShouldClose of true -> terminate_channels(), @@ -744,7 +764,7 @@ send_exception(State, Channel, Reason) -> false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. map_exception(Channel, Reason) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 3d10dc121e..375e540e7f 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, shutdown/1, mainloop/1]). +-export([start/4, shutdown/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,8 +48,8 @@ -ifdef(use_specs). --spec(start/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). --spec(start_link/3 :: (socket(), channel_number(), non_neg_integer()) -> pid()). +-spec(start/4 :: + (socket(), channel_number(), non_neg_integer(), protocol()) -> pid()). -spec(send_command/2 :: (pid(), amqp_method_record()) -> 'ok'). -spec(send_command/3 :: (pid(), amqp_method_record(), content()) -> 'ok'). -spec(send_command_and_signal_back/3 :: (pid(), amqp_method(), pid()) -> 'ok'). @@ -57,25 +57,21 @@ (pid(), amqp_method(), content(), pid()) -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), amqp_method_record(), content()) -> 'ok'). --spec(internal_send_command/3 :: - (socket(), channel_number(), amqp_method_record()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/4 :: + (socket(), channel_number(), amqp_method_record(), protocol()) -> 'ok'). +-spec(internal_send_command/6 :: (socket(), channel_number(), amqp_method_record(), - content(), non_neg_integer()) -> 'ok'). + content(), non_neg_integer(), protocol()) -> 'ok'). -endif. %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> +start(Sock, Channel, FrameMax, Protocol) -> spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}]). - -start_link(Sock, Channel, FrameMax) -> - spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, - channel = Channel, - frame_max = FrameMax}]). + frame_max = FrameMax, + protocol = Protocol}]). mainloop(State) -> receive @@ -85,35 +81,40 @@ mainloop(State) -> end. handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), State; handle_message({send_command, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), State; handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -153,16 +154,17 @@ shutdown(W) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, + Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), true = rabbit_framing:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( Channel, Content, FrameMax), [MethodFrame | ContentFrames]. @@ -171,12 +173,13 @@ tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -196,13 +199,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> |
