diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-02 14:51:24 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-02 14:51:24 +0100 |
| commit | 949b4e5e187083dea6a28375b3878aa751bc7b8d (patch) | |
| tree | c794c9c061f82b9439528ff76a4b17432297cb0d /src | |
| parent | 43420d3dc8a4eaa0d4be08ab5f63c110cb2ad640 (diff) | |
| parent | 2efd9f8a2a2b167b21fc22b427613564b0bf6807 (diff) | |
| download | rabbitmq-server-git-949b4e5e187083dea6a28375b3878aa751bc7b8d.tar.gz | |
A ginormous amount of debitrotting, rewriting, debugging and refactoring to merge default into bug 15930
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_binary_generator.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_binary_parser.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_channel_sup_sup.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_framing_channel.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_heartbeat.erl | 101 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 248 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 21 | ||||
| -rw-r--r-- | src/rabbit_types.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_writer.erl | 90 | ||||
| -rw-r--r-- | src/supervisor2.erl | 6 |
19 files changed, 394 insertions, 325 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 18045b94fc..ada2c38e49 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -426,9 +426,9 @@ print_banner() -> "| ~s +---+ |~n" "| |~n" "+-------------------+~n" - "AMQP ~p-~p~n~s~n~s~n~n", + "~s~n~s~n~s~n~n", [Product, string:right([$v|Version], ProductLen), - ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR, + ?PROTOCOL_VERSION, ?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]), Settings = [{"node", node()}, {"app descriptor", app_location()}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 6bf2f6db28..870c119ad6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/4]). + stat/1, deliver/2, requeue/3, ack/4, reject/4]). -export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([consumers/1, consumers_all/1]). -export([basic_get/3, basic_consume/7, basic_cancel/4]). @@ -124,6 +124,7 @@ -spec(ack/4 :: (pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid()) -> 'ok'). +-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()). -spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok'). -spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()). @@ -367,6 +368,9 @@ requeue(QPid, MsgIds, ChPid) -> ack(QPid, Txn, MsgIds, ChPid) -> delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}). +reject(QPid, MsgIds, Requeue, ChPid) -> + delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}). + commit_all(QPids, Txn, ChPid) -> safe_delegate_call_ok( fun (QPid) -> exit({queue_disappeared, QPid}) end, diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 67f0fcf5ae..ac5fb7f972 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -783,6 +783,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; +handle_cast({reject, AckTags, Requeue, ChPid}, + State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + case lookup_ch(ChPid) of + not_found -> + noreply(State); + C = #cr{acktags = ChAckTags} -> + ChAckTags1 = subtract_acks(ChAckTags, AckTags), + store_ch_record(C#cr{acktags = ChAckTags1}), + noreply(case Requeue of + true -> requeue_and_run(AckTags, State); + false -> BQS1 = BQ:ack(AckTags, BQS), + State #q { backing_queue_state = BQS1 } + end) + end; + handle_cast({rollback, Txn, ChPid}, State) -> noreply(rollback_transaction(Txn, ChPid, State)); diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 03a19961fd..c76c01ac52 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -97,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) -> sender = self(), message = Message}. build_content(Properties, BodyBin) -> - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), #content{class_id = ClassId, properties = Properties, properties_bin = none, + protocol = none, payload_fragments_rev = [BodyBin]}. from_content(Content) -> #content{class_id = ClassId, properties = Props, payload_fragments_rev = FragmentsRev} = - rabbit_binary_parser:ensure_content_decoded(Content), - {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'), + %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 + rabbit_binary_parser:ensure_content_decoded(Content, + rabbit_framing_amqp_0_9_1), + {ClassId, _MethodId} = + rabbit_framing_amqp_0_9_1:method_id('basic.publish'), {Props, list_to_binary(lists:reverse(FragmentsRev))}. message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index 0e6ebe57bc..f0ec618044 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -41,12 +41,12 @@ % See definition of check_empty_content_body_frame_size/0, an assertion called at startup. -define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8). --export([build_simple_method_frame/2, - build_simple_content_frames/3, +-export([build_simple_method_frame/3, + build_simple_content_frames/4, build_heartbeat_frame/0]). -export([generate_table/1, encode_properties/2]). -export([check_empty_content_body_frame_size/0]). --export([ensure_content_encoded/1, clear_encoded_content/1]). +-export([ensure_content_encoded/2, clear_encoded_content/1]). -import(lists). @@ -56,20 +56,22 @@ -type(frame() :: [binary()]). --spec(build_simple_method_frame/2 :: - (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record()) +-spec(build_simple_method_frame/3 :: + (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), + rabbit_types:protocol()) -> frame()). --spec(build_simple_content_frames/3 :: +-spec(build_simple_content_frames/4 :: (rabbit_channel:channel_number(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> [frame()]). -spec(build_heartbeat_frame/0 :: () -> frame()). -spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()). -spec(encode_properties/2 :: ([rabbit_framing:amqp_property_type()], [any()]) -> binary()). -spec(check_empty_content_body_frame_size/0 :: () -> 'ok'). --spec(ensure_content_encoded/1 :: - (rabbit_types:content()) -> rabbit_types:encoded_content()). +-spec(ensure_content_encoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) -> + rabbit_types:encoded_content()). -spec(clear_encoded_content/1 :: (rabbit_types:content()) -> rabbit_types:unencoded_content()). @@ -77,30 +79,24 @@ %%---------------------------------------------------------------------------- -build_simple_method_frame(ChannelInt, MethodRecord) -> - MethodFields = rabbit_framing:encode_method_fields(MethodRecord), +build_simple_method_frame(ChannelInt, MethodRecord, Protocol) -> + MethodFields = Protocol:encode_method_fields(MethodRecord), MethodName = rabbit_misc:method_record_type(MethodRecord), - {ClassId, MethodId} = rabbit_framing:method_id(MethodName), + {ClassId, MethodId} = Protocol:method_id(MethodName), create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]). -build_simple_content_frames(ChannelInt, - #content{class_id = ClassId, - properties = ContentProperties, - properties_bin = ContentPropertiesBin, - payload_fragments_rev = PayloadFragmentsRev}, - FrameMax) -> - {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), +build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) -> + #content{class_id = ClassId, + properties_bin = ContentPropertiesBin, + payload_fragments_rev = PayloadFragmentsRev} = + ensure_content_encoded(Content, Protocol), + {BodySize, ContentFrames} = + build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt), HeaderFrame = create_frame(2, ChannelInt, [<<ClassId:16, 0:16, BodySize:64>>, - maybe_encode_properties(ContentProperties, ContentPropertiesBin)]), + ContentPropertiesBin]), [HeaderFrame | ContentFrames]. -maybe_encode_properties(_ContentProperties, ContentPropertiesBin) - when is_binary(ContentPropertiesBin) -> - ContentPropertiesBin; -maybe_encode_properties(ContentProperties, none) -> - rabbit_framing:encode_properties(ContentProperties). - build_content_frames(FragsRev, FrameMax, ChannelInt) -> BodyPayloadMax = if FrameMax == 0 -> iolist_size(FragsRev); @@ -283,13 +279,16 @@ check_empty_content_body_frame_size() -> ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE}) end. -ensure_content_encoded(Content = #content{properties_bin = PropsBin}) +ensure_content_encoded(Content = #content{properties_bin = PropsBin, + protocol = Protocol}, Protocol) when PropsBin =/= 'none' -> Content; -ensure_content_encoded(Content = #content{properties = Props}) -> - Content #content{properties_bin = rabbit_framing:encode_properties(Props)}. +ensure_content_encoded(Content = #content{properties = Props}, Protocol) -> + Content#content{properties_bin = Protocol:encode_properties(Props), + protocol = Protocol}. -clear_encoded_content(Content = #content{properties_bin = none}) -> +clear_encoded_content(Content = #content{properties_bin = none, + protocol = none}) -> Content; clear_encoded_content(Content = #content{properties = none}) -> %% Only clear when we can rebuild the properties_bin later in @@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) -> %% one of properties and properties_bin can be 'none' Content; clear_encoded_content(Content = #content{}) -> - Content#content{properties_bin = none}. + Content#content{properties_bin = none, protocol = none}. diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index 69e34440b8..1d0a62afe8 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -34,7 +34,7 @@ -include("rabbit.hrl"). -export([parse_table/1, parse_properties/2]). --export([ensure_content_decoded/1, clear_decoded_content/1]). +-export([ensure_content_decoded/2, clear_decoded_content/1]). -import(lists). @@ -45,8 +45,9 @@ -spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()). -spec(parse_properties/2 :: ([rabbit_framing:amqp_property_type()], binary()) -> [any()]). --spec(ensure_content_decoded/1 :: - (rabbit_types:content()) -> rabbit_types:decoded_content()). +-spec(ensure_content_decoded/2 :: + (rabbit_types:content(), rabbit_types:protocol()) + -> rabbit_types:decoded_content()). -spec(clear_decoded_content/1 :: (rabbit_types:content()) -> rabbit_types:undecoded_content()). @@ -162,12 +163,12 @@ parse_property(bit, Rest) -> parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) -> {parse_table(Table), Rest}. -ensure_content_decoded(Content = #content{properties = Props}) +ensure_content_decoded(Content = #content{properties = Props}, _Protocol) when Props =/= 'none' -> Content; -ensure_content_decoded(Content = #content{properties_bin = PropBin}) +ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol) when is_binary(PropBin) -> - Content#content{properties = rabbit_framing:decode_properties( + Content#content{properties = Protocol:decode_properties( Content#content.class_id, PropBin)}. clear_decoded_content(Content = #content{properties = none}) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 58cb82f838..7c800f41ac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -431,7 +431,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Exchange = rabbit_exchange:lookup_or_die(ExchangeName), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. - DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), + DecodedContent = rabbit_binary_parser:ensure_content_decoded( + Content, rabbit_framing_amqp_0_9_1), IsPersistent = is_message_persistent(DecodedContent), Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, @@ -648,6 +649,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}), {noreply, State2}; +handle_method(#'basic.reject'{delivery_tag = DeliveryTag, + requeue = Requeue}, + _, State = #ch{ unacked_message_q = UAMQ}) -> + {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false), + ok = fold_per_queue( + fun (QPid, MsgIds, ok) -> + rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self()) + end, ok, Acked), + ok = notify_limiter(State#ch.limiter_pid, Acked), + {noreply, State#ch{unacked_message_q = Remaining}}; + handle_method(#'exchange.declare'{exchange = ExchangeNameBin, type = TypeNameBin, passive = false, @@ -948,7 +960,7 @@ basic_return(#basic_message{exchange_name = ExchangeName, content = Content}, WriterPid, Reason) -> {_Close, ReplyCode, ReplyText} = - rabbit_framing:lookup_amqp_exception(Reason), + rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason), ok = rabbit_writer:send_command( WriterPid, #'basic.return'{reply_code = ReplyCode, diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index eb7fab0de2..1d02d992d2 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -33,7 +33,7 @@ -behaviour(supervisor2). --export([start_link/7, writer/1, framing_channel/1, channel/1]). +-export([start_link/8, writer/1, framing_channel/1, channel/1]). -export([init/1]). @@ -43,39 +43,41 @@ -ifdef(use_specs). --spec(start_link/7 :: - (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer(), pid(), rabbit_access_control:username(), - rabbit_types:vhost(), pid()) -> +-spec(start_link/8 :: + (rabbit_types:protocol(), rabbit_net:socket(), + rabbit_channel:channel_number(), non_neg_integer(), pid(), + rabbit_access_control:username(), rabbit_types:vhost(), pid()) -> ignore | rabbit_types:ok_or_error2(pid(), any())). -endif. %%---------------------------------------------------------------------------- -start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) -> - supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid, - Username, VHost, Collector]). +start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector) -> + supervisor2:start_link(?MODULE, [Protocol, Sock, Channel, FrameMax, + ReaderPid, Username, VHost, Collector]). writer(Pid) -> - hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])). + hd(supervisor2:find_child(Pid, writer)). channel(Pid) -> - hd(supervisor2:find_child(Pid, channel, worker, [rabbit_channel])). + hd(supervisor2:find_child(Pid, channel)). framing_channel(Pid) -> - hd(supervisor2:find_child(Pid, framing_channel, worker, - [rabbit_framing_channel])). + hd(supervisor2:find_child(Pid, framing_channel)). %%---------------------------------------------------------------------------- -init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) -> +init([Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost, + Collector]) -> Me = self(), {ok, {{one_for_all, 0, 1}, [{framing_channel, {rabbit_framing_channel, start_link, - [fun () -> channel(Me) end]}, + [fun () -> channel(Me) end, Protocol]}, permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]}, - {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]}, + {writer, {rabbit_writer, start_link, + [Sock, Channel, FrameMax, Protocol]}, permanent, ?MAX_WAIT, worker, [rabbit_writer]}, {channel, {rabbit_channel, start_link, [Channel, fun () -> ReaderPid end, diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl index 2fab867841..f608b724ac 100644 --- a/src/rabbit_channel_sup_sup.erl +++ b/src/rabbit_channel_sup_sup.erl @@ -33,16 +33,16 @@ -behaviour(supervisor2). --export([start_link/0, start_channel/2]). +-export([start_link/1, start_channel/2]). -export([init/1]). -start_link() -> - supervisor2:start_link(?MODULE, []). +start_link(Protocol) -> + supervisor2:start_link(?MODULE, [Protocol]). -init([]) -> +init([Protocol]) -> {ok, {{simple_one_for_one_terminate, 0, 1}, - [{channel_sup, {rabbit_channel_sup, start_link, []}, + [{channel_sup, {rabbit_channel_sup, start_link, [Protocol]}, temporary, infinity, supervisor, [rabbit_channel_sup]}]}}. start_channel(Pid, Args) -> diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 3dd81ef6c5..5d05ca28ff 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -47,15 +47,12 @@ init([]) -> [{reader, {rabbit_reader, start_link, []}, permanent, ?MAX_WAIT, worker, [rabbit_reader]}, {collector, {rabbit_queue_collector, start_link, []}, - permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]}, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, - permanent, infinity, supervisor, [rabbit_channel_sup_sup]} + permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]} ]}}. reader(Pid) -> - hd(supervisor2:find_child(Pid, reader, worker, [rabbit_reader])). + hd(supervisor2:find_child(Pid, reader)). channel_sup_sup(Pid) -> - hd(supervisor2:find_child(Pid, channel_sup_sup, supervisor, - [rabbit_channel_sup_sup])). + hd(supervisor2:find_child(Pid, channel_sup_sup)). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 7f7622b255..49f87a22a6 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -378,7 +378,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) -> [X] = mnesia:read({rabbit_exchange, ExchangeName}), {maybe_auto_delete(X), Bindings}. - delete_forward_routes(Route) -> ok = mnesia:delete_object(rabbit_route, Route, write), ok = mnesia:delete_object(rabbit_durable_route, Route, write). diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl index 3a0d71f5f3..6ece343655 100644 --- a/src/rabbit_framing_channel.erl +++ b/src/rabbit_framing_channel.erl @@ -32,15 +32,16 @@ -module(rabbit_framing_channel). -include("rabbit.hrl"). --export([start_link/1, process/2, shutdown/1]). +-export([start_link/2, process/2, shutdown/1]). %% internal --export([mainloop/1]). +-export([mainloop/2]). %%-------------------------------------------------------------------- -start_link(GetChannelPid) -> - {ok, proc_lib:spawn_link(fun () -> mainloop(GetChannelPid()) end)}. +start_link(GetChannelPid, Protocol) -> + {ok, proc_lib:spawn_link( + fun () -> mainloop(GetChannelPid(), Protocol) end)}. process(Pid, Frame) -> Pid ! {frame, Frame}, @@ -60,37 +61,42 @@ read_frame(ChannelPid) -> Msg -> exit({unexpected_message, Msg}) end. -mainloop(ChannelPid) -> - {method, MethodName, FieldsBin} = read_frame(ChannelPid), - Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin), - case rabbit_framing:method_has_content(MethodName) of - true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName), - rabbit_channel:do(ChannelPid, Method, - collect_content(ChannelPid, ClassId)); - false -> rabbit_channel:do(ChannelPid, Method) - end, - ?MODULE:mainloop(ChannelPid). +mainloop(ChannelPid, Protocol) -> + case read_frame(ChannelPid) of + {method, MethodName, FieldsBin} -> + Method = Protocol:decode_method_fields(MethodName, FieldsBin), + case Protocol:method_has_content(MethodName) of + true -> {ClassId, _MethodId} = Protocol:method_id(MethodName), + rabbit_channel:do(ChannelPid, Method, + collect_content(ChannelPid, + ClassId, + Protocol)); + false -> rabbit_channel:do(ChannelPid, Method) + end, + ?MODULE:mainloop(ChannelPid, Protocol); + _ -> + unexpected_frame("expected method frame, " + "got non method frame instead", + []) + end. -collect_content(ChannelPid, ClassId) -> +collect_content(ChannelPid, ClassId, Protocol) -> case read_frame(ChannelPid) of {content_header, ClassId, 0, BodySize, PropertiesBin} -> Payload = collect_content_payload(ChannelPid, BodySize, []), #content{class_id = ClassId, properties = none, properties_bin = PropertiesBin, + protocol = Protocol, payload_fragments_rev = Payload}; {content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got one for class ~w instead", - [ClassId, HeaderClassId]); + unexpected_frame("expected content header for class ~w, " + "got one for class ~w instead", + [ClassId, HeaderClassId]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content header for class ~w, " - "got non content header frame instead", - [ClassId]) + unexpected_frame("expected content header for class ~w, " + "got non content header frame instead", + [ClassId]) end. collect_content_payload(_ChannelPid, 0, Acc) -> @@ -102,8 +108,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) -> RemainingByteCount - size(FragmentBin), [FragmentBin | Acc]); _ -> - rabbit_misc:protocol_error( - command_invalid, - "expected content body, got non content body frame instead", - []) + unexpected_frame("expected content body, " + "got non content body frame instead", + []) end. + +unexpected_frame(ExplanationFormat, Params) -> + rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params). diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl index b7c73ae7a6..e8e3236a16 100644 --- a/src/rabbit_heartbeat.erl +++ b/src/rabbit_heartbeat.erl @@ -31,83 +31,94 @@ -module(rabbit_heartbeat). --include("rabbit.hrl"). - -export([start_heartbeat/3, start_heartbeat_sender/2, start_heartbeat_receiver/2]). +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:maybe({pid(), pid()})). +-spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). +-spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) -> + rabbit_types:ok(pid())). + +-endif. + +%%---------------------------------------------------------------------------- + start_heartbeat(_Sup, _Sock, 0) -> none; start_heartbeat(Sup, Sock, TimeoutSec) -> - {ok, _Sender} = + {ok, Sender} = supervisor:start_child( Sup, {heartbeat_sender, {?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - {ok, _Receiver} = + {ok, Receiver} = supervisor:start_child( Sup, {heartbeat_receiver, {?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]}, transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}), - ok. + {Sender, Receiver}. start_heartbeat_sender(Sock, TimeoutSec) -> %% the 'div 2' is there so that we don't end up waiting for nearly %% 2 * TimeoutSec before sending a heartbeat in the boundary case %% where the last message was sent just after a heartbeat. + Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2, + fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0, fun () -> catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()), continue - end) + end}, Parent) end)}. start_heartbeat_receiver(Sock, TimeoutSec) -> %% we check for incoming data every interval, and time out after %% two checks with no change. As a result we will time out between %% 2 and 3 intervals after the last data has been received. + Parent = self(), {ok, proc_lib:spawn_link( - fun () -> heartbeater(Sock, TimeoutSec * 1000, + fun () -> heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1, - fun () -> exit(timeout) end) - end)}. + fun () -> exit(timeout) end}, Parent) end)}. -%% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list -%% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html -y(X) -> - F = fun (P) -> X(fun (A) -> (P(P))(A) end) end, - F(F). +heartbeater(Params, Parent) -> + heartbeater(Params, erlang:monitor(process, Parent), {0, 0}). -heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) -> - Heartbeat = - fun (F) -> - fun ({StatVal, SameCount}) -> - receive - Other -> exit({unexpected_message, Other}) - after TimeoutMillisec -> - case rabbit_net:getstat(Sock, [StatName]) of - {ok, [{StatName, NewStatVal}]} -> - if NewStatVal =/= StatVal -> - F({NewStatVal, 0}); - SameCount < Threshold -> - F({NewStatVal, SameCount + 1}); - true -> - continue = Handler(), - F({NewStatVal, 0}) - end; - {error, einval} -> - %% the socket is dead, most - %% likely because the - %% connection is being shut - %% down -> terminate - ok; - {error, Reason} -> - exit({cannot_get_socket_stats, Reason}) - end - end - end - end, - (y(Heartbeat))({0, 0}). +heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params, + MonitorRef, {StatVal, SameCount}) -> + receive + {'DOWN', MonitorRef, process, _Object, _Info} -> + ok; + Other -> + exit({unexpected_message, Other}) + after TimeoutMillisec -> + case rabbit_net:getstat(Sock, [StatName]) of + {ok, [{StatName, NewStatVal}]} -> + Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end, + if NewStatVal =/= StatVal -> + Recurse({NewStatVal, 0}); + SameCount < Threshold -> + Recurse({NewStatVal, SameCount + 1}); + true -> + case Handler() of + continue -> Recurse({NewStatVal, 0}) + end + end; + {error, einval} -> + %% the socket is dead, most likely because the + %% connection is being shut down -> terminate + ok; + {error, Reason} -> + exit({cannot_get_socket_stats, Reason}) + end + end. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 491ae7d6eb..813ccc7538 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -45,7 +45,7 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> {'ok', pid()}). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 99d76b8ac4..2dc3e64b5e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -41,7 +41,7 @@ -export([server_properties/0]). --export([analyze_frame/2]). +-export([analyze_frame/3]). -import(gen_tcp). -import(fprof). @@ -62,8 +62,8 @@ -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, - recv_oct, recv_cnt, send_oct, send_cnt, send_pend, - state, channels, user, vhost, timeout, frame_max, client_properties]). + recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels, + protocol, user, vhost, timeout, frame_max, client_properties]). %% connection lifecycle %% @@ -248,14 +248,13 @@ start_connection(Parent, Deb, Sock, SockTransform) -> erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), ProfilingValue = setup_profiling(), - [Collector] = - [Pid || {collector, Pid, worker, [rabbit_queue_collector]} - <- supervisor:which_children(Parent)], + [Collector] = supervisor2:find_child(Parent, collector), try mainloop(Deb, switch_callback( #v1{parent = Parent, sock = ClientSock, connection = #connection{ + protocol = none, user = none, timeout_sec = ?HANDSHAKE_TIMEOUT, frame_max = ?FRAME_MIN_SIZE, @@ -444,7 +443,9 @@ wait_for_channel_termination(N, TimerRef) -> end. maybe_close(State = #v1{connection_state = closing, - queue_collector = Collector}) -> + queue_collector = Collector, + connection = #connection{protocol = Protocol}, + sock = Sock}) -> case all_channels() of [] -> %% Spec says "Exclusive queues may only be accessed by the current @@ -452,16 +453,18 @@ maybe_close(State = #v1{connection_state = closing, %% This does not strictly imply synchrony, but in practice it seems %% to be what people assume. rabbit_queue_collector:delete_all(Collector), - ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), close_connection(State); _ -> State end; maybe_close(State) -> State. -handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) +handle_frame(Type, 0, Payload, + State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}}) when CS =:= closing; CS =:= closed -> - case analyze_frame(Type, Payload) of + case analyze_frame(Type, Payload, Protocol) of {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); _Other -> State @@ -469,20 +472,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS}) handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS}) when CS =:= closing; CS =:= closed -> State; -handle_frame(Type, 0, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, 0, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, 0, Type, Payload}); heartbeat -> State; - trace -> State; {method, MethodName, FieldsBin} -> handle_method0(MethodName, FieldsBin, State); Other -> throw({unexpected_frame_on_channel0, Other}) end; -handle_frame(Type, Channel, Payload, State) -> - case analyze_frame(Type, Payload) of +handle_frame(Type, Channel, Payload, + State = #v1{connection = #connection{protocol = Protocol}}) -> + case analyze_frame(Type, Payload, Protocol) of error -> throw({unknown_frame, Channel, Type, Payload}); heartbeat -> throw({unexpected_heartbeat_frame, Channel}); - trace -> throw({unexpected_trace_frame, Channel}); AnalyzedFrame -> %%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]), case get({channel, Channel}) of @@ -523,17 +526,20 @@ handle_frame(Type, Channel, Payload, State) -> end end. -analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) -> - {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields}; -analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) -> +analyze_frame(?FRAME_METHOD, + <<ClassId:16, MethodId:16, MethodFields/binary>>, + Protocol) -> + MethodName = Protocol:lookup_method_name({ClassId, MethodId}), + {method, MethodName, MethodFields}; +analyze_frame(?FRAME_HEADER, + <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>, + _Protocol) -> {content_header, ClassId, Weight, BodySize, Properties}; -analyze_frame(?FRAME_BODY, Body) -> +analyze_frame(?FRAME_BODY, Body, _Protocol) -> {content_body, Body}; -analyze_frame(?FRAME_TRACE, _Body) -> - trace; -analyze_frame(?FRAME_HEARTBEAT, <<>>) -> +analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) -> heartbeat; -analyze_frame(_Type, _Body) -> +analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> @@ -550,54 +556,75 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat throw({bad_payload, PayloadAndMarker}) end; -handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>, - State = #v1{sock = Sock, connection = Connection}) -> - case check_version({ProtocolMajor, ProtocolMinor}, - {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of - true -> - ok = send_on_channel0( - Sock, - #'connection.start'{ - version_major = ?PROTOCOL_VERSION_MAJOR, - version_minor = ?PROTOCOL_VERSION_MINOR, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT}, - connection_state = starting}, - frame_header, 7}; - false -> - throw({bad_version, ProtocolMajor, ProtocolMinor}) - end; +%% The two rules pertaining to version negotiation: +%% +%% * If the server cannot support the protocol specified in the +%% protocol header, it MUST respond with a valid protocol header and +%% then close the socket connection. +%% +%% * The server MUST provide a protocol version that is lower than or +%% equal to that requested by the client in the protocol header. +handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) -> + start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State); + +%% This is the protocol header for 0-9, which we can safely treat as +%% though it were 0-9-1. +handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) -> + start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State); + +%% This is what most clients send for 0-8. The 0-8 spec, confusingly, +%% defines the version as 8-0. +handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +%% The 0-8 spec as on the AMQP web site actually has this as the +%% protocol header; some libraries e.g., py-amqplib, send it when they +%% want 0-8. +handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> + start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); + +handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> + refuse_connection(Sock, {bad_version, A, B, C, D}); handle_input(handshake, Other, #v1{sock = Sock}) -> - ok = inet_op(fun () -> rabbit_net:send( - Sock, <<"AMQP",1,1, - ?PROTOCOL_VERSION_MAJOR, - ?PROTOCOL_VERSION_MINOR>>) end), - throw({bad_header, Other}); + refuse_connection(Sock, {bad_header, Other}); handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). -%% the 0-8 spec, confusingly, defines the version as 8-0 -adjust_version({8,0}) -> {0,8}; -adjust_version(Version) -> Version. -check_version(ClientVersion, ServerVersion) -> - {ClientMajor, ClientMinor} = adjust_version(ClientVersion), - {ServerMajor, ServerMinor} = adjust_version(ServerVersion), - ClientMajor > ServerMajor - orelse - (ClientMajor == ServerMajor andalso - ClientMinor >= ServerMinor). +%% Offer a protocol version to the client. Connection.start only +%% includes a major and minor version number, Luckily 0-9 and 0-9-1 +%% are similar enough that clients will be happy with either. +start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, + Protocol, State = #v1{parent = Parent, sock = Sock, + connection = Connection}) -> + {ok, _Pid} = + supervisor:start_child( + Parent, {channel_sup_sup, + {rabbit_channel_sup_sup, start_link, [Protocol]}, + permanent, infinity, supervisor, [rabbit_channel_sup_sup]}), + Start = #'connection.start'{ version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = <<"PLAIN AMQPLAIN">>, + locales = <<"en_US">> }, + ok = send_on_channel0(Sock, Start, Protocol), + {State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7}. + +refuse_connection(Sock, Exception) -> + ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), + throw(Exception). %%-------------------------------------------------------------------------- -handle_method0(MethodName, FieldsBin, State) -> +handle_method0(MethodName, FieldsBin, + State = #v1{connection = #connection{protocol = Protocol}}) -> try - handle_method0(rabbit_framing:decode_method_fields( - MethodName, FieldsBin), + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), State) catch exit:Reason -> CompleteReason = case Reason of @@ -619,14 +646,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, State = #v1{connection_state = starting, - connection = Connection, + connection = Connection = + #connection{protocol = Protocol}, sock = Sock}) -> User = rabbit_access_control:check_login(Mechanism, Response), - ok = send_on_channel0( - Sock, - #'connection.tune'{channel_max = 0, + Tune = #'connection.tune'{channel_max = 0, frame_max = ?FRAME_MAX, - heartbeat = 0}), + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), State#v1{connection_state = tuning, connection = Connection#connection{ user = User, @@ -653,46 +680,30 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax, frame_max = FrameMax}} end; -handle_method0(#'connection.open'{virtual_host = VHostPath, - insist = Insist}, +handle_method0(#'connection.open'{virtual_host = VHostPath}, + State = #v1{connection_state = opening, connection = Connection = #connection{ - user = User}, + user = User, + protocol = Protocol}, sock = Sock}) -> ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, - KnownHosts = format_listeners(rabbit_networking:active_listeners()), - Redirects = compute_redirects(Insist), - if Redirects == [] -> - ok = send_on_channel0( - Sock, - #'connection.open_ok'{known_hosts = KnownHosts}), - State#v1{connection_state = running, - connection = NewConnection}; - true -> - %% FIXME: 'host' is supposed to only contain one - %% address; but which one do we pick? This is - %% really a problem with the spec. - Host = format_listeners(Redirects), - rabbit_log:info("connection ~p redirecting to ~p~n", - [self(), Host]), - ok = send_on_channel0( - Sock, - #'connection.redirect'{host = Host, - known_hosts = KnownHosts}), - close_connection(State#v1{connection = NewConnection}) - end; + ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), + State#v1{connection_state = running, + connection = NewConnection}; handle_method0(#'connection.close'{}, State = #v1{connection_state = running}) -> lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()), maybe_close(State#v1{connection_state = closing}); handle_method0(#'connection.close'{}, State = #v1{connection_state = CS, + connection = #connection{protocol = Protocol}, sock = Sock}) when CS =:= closing; CS =:= closed -> %% We're already closed or closing, so we don't need to cleanup %% anything. - ok = send_on_channel0(Sock, #'connection.close_ok'{}), + ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol), State; handle_method0(#'connection.close_ok'{}, State = #v1{connection_state = closed}) -> @@ -705,23 +716,8 @@ handle_method0(_Method, #v1{connection_state = S}) -> rabbit_misc:protocol_error( channel_error, "unexpected method in connection state ~w", [S]). -send_on_channel0(Sock, Method) -> - ok = rabbit_writer:internal_send_command(Sock, 0, Method). - -format_listeners(Listeners) -> - list_to_binary( - rabbit_misc:intersperse( - $,, - [io_lib:format("~s:~w", [Host, Port]) || - #listener{host = Host, port = Port} <- Listeners])). - -compute_redirects(true) -> []; -compute_redirects(false) -> - Node = node(), - LNode = rabbit_load:pick(), - if Node == LNode -> []; - true -> rabbit_networking:node_listeners(LNode) - end. +send_on_channel0(Sock, Method, Protocol) -> + ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). %%-------------------------------------------------------------------------- @@ -755,6 +751,10 @@ i(state, #v1{connection_state = S}) -> S; i(channels, #v1{}) -> length(all_channels()); +i(protocol, #v1{connection = #connection{protocol = none}}) -> + none; +i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> + Protocol:version(); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> @@ -801,25 +801,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) -> log_channel_error(CS, Channel, Reason), send_exception(State, Channel, Reason). -send_exception(State, Channel, Reason) -> - {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason), +send_exception(State = #v1{connection = #connection{protocol = Protocol}}, + Channel, Reason) -> + {ShouldClose, CloseChannel, CloseMethod} = + map_exception(Channel, Reason, Protocol), NewState = case ShouldClose of true -> terminate_channels(), close_connection(State); false -> close_channel(Channel, State) end, ok = rabbit_writer:internal_send_command( - NewState#v1.sock, CloseChannel, CloseMethod), + NewState#v1.sock, CloseChannel, CloseMethod, Protocol), NewState. -map_exception(Channel, Reason) -> +map_exception(Channel, Reason, Protocol) -> {SuggestedClose, ReplyCode, ReplyText, FailedMethod} = - lookup_amqp_exception(Reason), + lookup_amqp_exception(Reason, Protocol), ShouldClose = SuggestedClose or (Channel == 0), {ClassId, MethodId} = case FailedMethod of {_, _} -> FailedMethod; none -> {0, 0}; - _ -> rabbit_framing:method_id(FailedMethod) + _ -> Protocol:method_id(FailedMethod) end, {CloseChannel, CloseMethod} = case ShouldClose of @@ -834,22 +836,16 @@ map_exception(Channel, Reason) -> end, {ShouldClose, CloseChannel, CloseMethod}. -%% FIXME: this clause can go when we move to AMQP spec >=8.1 -lookup_amqp_exception(#amqp_error{name = precondition_failed, - explanation = Expl, - method = Method}) -> - ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl), - {false, 406, ExplBin, Method}; lookup_amqp_exception(#amqp_error{name = Name, explanation = Expl, - method = Method}) -> - {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name), + method = Method}, + Protocol) -> + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name), ExplBin = amqp_exception_explanation(Text, Expl), {ShouldClose, Code, ExplBin, Method}; -lookup_amqp_exception(Other) -> +lookup_amqp_exception(Other, Protocol) -> rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]), - {ShouldClose, Code, Text} = - rabbit_framing:lookup_amqp_exception(internal_error), + {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error), {ShouldClose, Code, Text, none}. amqp_exception_explanation(Text, Expl) -> diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index b46df02de7..06fcb691d6 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -504,8 +504,10 @@ test_content_framing(FrameMax, BodyBin) -> rabbit_binary_generator:build_simple_content_frames( 1, rabbit_binary_generator:ensure_content_encoded( - rabbit_basic:build_content(#'P_basic'{}, BodyBin)), - FrameMax), + rabbit_basic:build_content(#'P_basic'{}, BodyBin), + rabbit_framing_amqp_0_9_1), + FrameMax, + rabbit_framing_amqp_0_9_1), %% header is formatted correctly and the size is the total of the %% fragments <<_FrameHeader:7/binary, _ClassAndWeight:4/binary, @@ -935,11 +937,14 @@ test_user_management() -> passed. +make_fun(Result) -> + fun () -> Result end. + test_server_status() -> %% create a few things so there is some useful information to list Writer = spawn(fun () -> receive shutdown -> ok end end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, make_fun(self()), make_fun(Writer), + <<"user">>, <<"/">>, self()), [Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>], {new, Queue = #amqqueue{}} <- [rabbit_amqqueue:declare( @@ -1074,8 +1079,8 @@ test_memory_pressure_flush(Writer) -> test_memory_pressure_spawn() -> Me = self(), Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end), - {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>, - self()), + {ok, Ch} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer), + <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch, #'channel.open'{}), receive #'channel.open_ok'{} -> ok after 1000 -> throw(failed_to_receive_channel_open_ok) @@ -1148,8 +1153,8 @@ test_memory_pressure() -> alarm_handler:set_alarm({vm_memory_high_watermark, []}), Me = self(), Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end), - {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>, - <<"/">>, self()), + {ok, Ch4} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer4), + <<"user">>, <<"/">>, self()), ok = rabbit_channel:do(Ch4, #'channel.open'{}), ok = test_memory_pressure_flush(Writer4), receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok) diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2e492b80bd..3aaf1917fe 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -39,8 +39,8 @@ delivery/0, content/0, decoded_content/0, undecoded_content/0, unencoded_content/0, encoded_content/0, vhost/0, ctag/0, amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0, - binding/0, amqqueue/0, exchange/0, connection/0, user/0, - error/1, ok_or_error/1, ok_or_error2/2, ok/1]). + binding/0, amqqueue/0, exchange/0, connection/0, protocol/0, + user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -133,6 +133,8 @@ -type(connection() :: pid()). +-type(protocol() :: atom()). + -type(user() :: #user{username :: rabbit_access_control:username(), password :: rabbit_access_control:password()}). diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 16436bc00a..483b46f700 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -33,14 +33,14 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([start/3, start_link/3, flush/1, mainloop/1]). +-export([start/4, start_link/4, flush/1, mainloop/1]). -export([send_command/2, send_command/3, send_command_and_signal_back/3, send_command_and_signal_back/4, send_command_and_notify/5]). --export([internal_send_command/3, internal_send_command/5]). +-export([internal_send_command/4, internal_send_command/6]). -import(gen_tcp). --record(wstate, {sock, channel, frame_max}). +-record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -48,12 +48,14 @@ -ifdef(use_specs). --spec(start/3 :: +-spec(start/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) -> rabbit_types:ok(pid())). --spec(start_link/3 :: + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). +-spec(start_link/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - non_neg_integer()) -> rabbit_types:ok(pid())). + non_neg_integer(), rabbit_types:protocol()) + -> rabbit_types:ok(pid())). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(send_command/3 :: @@ -68,14 +70,14 @@ (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). --spec(internal_send_command/3 :: +-spec(internal_send_command/4 :: (rabbit_net:socket(), rabbit_channel:channel_number(), - rabbit_framing:amqp_method_record()) + rabbit_framing:amqp_method_record(), rabbit_types:protocol()) -> 'ok'). --spec(internal_send_command/5 :: +-spec(internal_send_command/6 :: (rabbit_net:socket(), rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(), rabbit_types:content(), - non_neg_integer()) + non_neg_integer(), rabbit_types:protocol()) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). @@ -83,17 +85,19 @@ %%---------------------------------------------------------------------------- -start(Sock, Channel, FrameMax) -> +start(Sock, Channel, FrameMax, Protocol) -> {ok, proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}])}. + frame_max = FrameMax, + protocol = Protocol}])}. -start_link(Sock, Channel, FrameMax) -> +start_link(Sock, Channel, FrameMax, Protocol) -> {ok, proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}])}. + frame_max = FrameMax, + protocol = Protocol}])}. mainloop(State) -> receive @@ -103,35 +107,40 @@ mainloop(State) -> end. handle_message({send_command, MethodRecord}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), State; handle_message({send_command, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), State; handle_message({send_command_and_signal_back, MethodRecord, Parent}, - State = #wstate{sock = Sock, channel = Channel}) -> - ok = internal_send_command_async(Sock, Channel, MethodRecord), + State = #wstate{sock = Sock, channel = Channel, + protocol = Protocol}) -> + ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_signal_back, MethodRecord, Content, Parent}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), Parent ! rabbit_writer_send_command_signal, State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State = #wstate{sock = Sock, channel = Channel, - frame_max = FrameMax}) -> + frame_max = FrameMax, + protocol = Protocol}) -> ok = internal_send_command_async(Sock, Channel, MethodRecord, - Content, FrameMax), + Content, FrameMax, Protocol), rabbit_amqqueue:notify_sent(QPid, ChPid), State; handle_message({inet_reply, _, ok}, State) -> @@ -173,30 +182,32 @@ flush(W) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord) -> +assemble_frames(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), - rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord). + rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord, + Protocol). -assemble_frames(Channel, MethodRecord, Content, FrameMax) -> +assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), - true = rabbit_framing:method_has_content(MethodName), % assertion + true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( - Channel, MethodRecord), + Channel, MethodRecord, Protocol), ContentFrames = rabbit_binary_generator:build_simple_content_frames( - Channel, Content, FrameMax), + Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). -internal_send_command(Sock, Channel, MethodRecord) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)). +internal_send_command(Sock, Channel, MethodRecord, Protocol) -> + ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). -internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)). + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -216,13 +227,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) -> %% Also note that the port has bounded buffers and port_command blocks %% when these are full. So the fact that we process the result %% asynchronously does not impact flow control. -internal_send_command_async(Sock, Channel, MethodRecord) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)), +internal_send_command_async(Sock, Channel, MethodRecord, Protocol) -> + true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), ok. -internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) -> +internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax, + Protocol) -> true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax)), + Content, FrameMax, Protocol)), ok. port_cmd(Sock, Data) -> diff --git a/src/supervisor2.erl b/src/supervisor2.erl index d716108cd0..d9eb4d5b46 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -62,7 +62,7 @@ -export([start_link/2,start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, find_child/4, + which_children/1, find_child/2, check_childspecs/1]). -export([behaviour_info/1]). @@ -137,9 +137,9 @@ terminate_child(Supervisor, Name) -> which_children(Supervisor) -> call(Supervisor, which_children). -find_child(Supervisor, Name, Type, Modules) -> +find_child(Supervisor, Name) -> [Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor), - Name1 =:= Name, Type1 =:= Type, Modules1 =:= Modules]. + Name1 =:= Name]. call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). |
