summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-02 14:51:24 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-02 14:51:24 +0100
commit949b4e5e187083dea6a28375b3878aa751bc7b8d (patch)
treec794c9c061f82b9439528ff76a4b17432297cb0d /src
parent43420d3dc8a4eaa0d4be08ab5f63c110cb2ad640 (diff)
parent2efd9f8a2a2b167b21fc22b427613564b0bf6807 (diff)
downloadrabbitmq-server-git-949b4e5e187083dea6a28375b3878aa751bc7b8d.tar.gz
A ginormous amount of debitrotting, rewriting, debugging and refactoring to merge default into bug 15930
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl4
-rw-r--r--src/rabbit_amqqueue.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl15
-rw-r--r--src/rabbit_basic.erl12
-rw-r--r--src/rabbit_binary_generator.erl61
-rw-r--r--src/rabbit_binary_parser.erl13
-rw-r--r--src/rabbit_channel.erl16
-rw-r--r--src/rabbit_channel_sup.erl32
-rw-r--r--src/rabbit_channel_sup_sup.erl10
-rw-r--r--src/rabbit_connection_sup.erl9
-rw-r--r--src/rabbit_exchange.erl1
-rw-r--r--src/rabbit_framing_channel.erl66
-rw-r--r--src/rabbit_heartbeat.erl101
-rw-r--r--src/rabbit_limiter.erl2
-rw-r--r--src/rabbit_reader.erl248
-rw-r--r--src/rabbit_tests.erl21
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl90
-rw-r--r--src/supervisor2.erl6
19 files changed, 394 insertions, 325 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 18045b94fc..ada2c38e49 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -426,9 +426,9 @@ print_banner() ->
"| ~s +---+ |~n"
"| |~n"
"+-------------------+~n"
- "AMQP ~p-~p~n~s~n~s~n~n",
+ "~s~n~s~n~s~n~n",
[Product, string:right([$v|Version], ProductLen),
- ?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR,
+ ?PROTOCOL_VERSION,
?COPYRIGHT_MESSAGE, ?INFORMATION_MESSAGE]),
Settings = [{"node", node()},
{"app descriptor", app_location()},
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 6bf2f6db28..870c119ad6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/4]).
+ stat/1, deliver/2, requeue/3, ack/4, reject/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([consumers/1, consumers_all/1]).
-export([basic_get/3, basic_consume/7, basic_cancel/4]).
@@ -124,6 +124,7 @@
-spec(ack/4 ::
(pid(), rabbit_types:maybe(rabbit_types:txn()), [msg_id()], pid())
-> 'ok').
+-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
-spec(commit_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> ok_or_errors()).
-spec(rollback_all/3 :: ([pid()], rabbit_types:txn(), pid()) -> 'ok').
-spec(notify_down_all/2 :: ([pid()], pid()) -> ok_or_errors()).
@@ -367,6 +368,9 @@ requeue(QPid, MsgIds, ChPid) ->
ack(QPid, Txn, MsgIds, ChPid) ->
delegate_pcast(QPid, 7, {ack, Txn, MsgIds, ChPid}).
+reject(QPid, MsgIds, Requeue, ChPid) ->
+ delegate_pcast(QPid, 7, {reject, MsgIds, Requeue, ChPid}).
+
commit_all(QPids, Txn, ChPid) ->
safe_delegate_call_ok(
fun (QPid) -> exit({queue_disappeared, QPid}) end,
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 67f0fcf5ae..ac5fb7f972 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -783,6 +783,21 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+handle_cast({reject, AckTags, Requeue, ChPid},
+ State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ noreply(State);
+ C = #cr{acktags = ChAckTags} ->
+ ChAckTags1 = subtract_acks(ChAckTags, AckTags),
+ store_ch_record(C#cr{acktags = ChAckTags1}),
+ noreply(case Requeue of
+ true -> requeue_and_run(AckTags, State);
+ false -> BQS1 = BQ:ack(AckTags, BQS),
+ State #q { backing_queue_state = BQS1 }
+ end)
+ end;
+
handle_cast({rollback, Txn, ChPid}, State) ->
noreply(rollback_transaction(Txn, ChPid, State));
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 03a19961fd..c76c01ac52 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -97,18 +97,24 @@ delivery(Mandatory, Immediate, Txn, Message) ->
sender = self(), message = Message}.
build_content(Properties, BodyBin) ->
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
#content{class_id = ClassId,
properties = Properties,
properties_bin = none,
+ protocol = none,
payload_fragments_rev = [BodyBin]}.
from_content(Content) ->
#content{class_id = ClassId,
properties = Props,
payload_fragments_rev = FragmentsRev} =
- rabbit_binary_parser:ensure_content_decoded(Content),
- {ClassId, _MethodId} = rabbit_framing:method_id('basic.publish'),
+ %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1
+ rabbit_binary_parser:ensure_content_decoded(Content,
+ rabbit_framing_amqp_0_9_1),
+ {ClassId, _MethodId} =
+ rabbit_framing_amqp_0_9_1:method_id('basic.publish'),
{Props, list_to_binary(lists:reverse(FragmentsRev))}.
message(ExchangeName, RoutingKeyBin, RawProperties, BodyBin) ->
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 0e6ebe57bc..f0ec618044 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -41,12 +41,12 @@
% See definition of check_empty_content_body_frame_size/0, an assertion called at startup.
-define(EMPTY_CONTENT_BODY_FRAME_SIZE, 8).
--export([build_simple_method_frame/2,
- build_simple_content_frames/3,
+-export([build_simple_method_frame/3,
+ build_simple_content_frames/4,
build_heartbeat_frame/0]).
-export([generate_table/1, encode_properties/2]).
-export([check_empty_content_body_frame_size/0]).
--export([ensure_content_encoded/1, clear_encoded_content/1]).
+-export([ensure_content_encoded/2, clear_encoded_content/1]).
-import(lists).
@@ -56,20 +56,22 @@
-type(frame() :: [binary()]).
--spec(build_simple_method_frame/2 ::
- (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record())
+-spec(build_simple_method_frame/3 ::
+ (rabbit_channel:channel_number(), rabbit_framing:amqp_method_record(),
+ rabbit_types:protocol())
-> frame()).
--spec(build_simple_content_frames/3 ::
+-spec(build_simple_content_frames/4 ::
(rabbit_channel:channel_number(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> [frame()]).
-spec(build_heartbeat_frame/0 :: () -> frame()).
-spec(generate_table/1 :: (rabbit_framing:amqp_table()) -> binary()).
-spec(encode_properties/2 ::
([rabbit_framing:amqp_property_type()], [any()]) -> binary()).
-spec(check_empty_content_body_frame_size/0 :: () -> 'ok').
--spec(ensure_content_encoded/1 ::
- (rabbit_types:content()) -> rabbit_types:encoded_content()).
+-spec(ensure_content_encoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol()) ->
+ rabbit_types:encoded_content()).
-spec(clear_encoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:unencoded_content()).
@@ -77,30 +79,24 @@
%%----------------------------------------------------------------------------
-build_simple_method_frame(ChannelInt, MethodRecord) ->
- MethodFields = rabbit_framing:encode_method_fields(MethodRecord),
+build_simple_method_frame(ChannelInt, MethodRecord, Protocol) ->
+ MethodFields = Protocol:encode_method_fields(MethodRecord),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- {ClassId, MethodId} = rabbit_framing:method_id(MethodName),
+ {ClassId, MethodId} = Protocol:method_id(MethodName),
create_frame(1, ChannelInt, [<<ClassId:16, MethodId:16>>, MethodFields]).
-build_simple_content_frames(ChannelInt,
- #content{class_id = ClassId,
- properties = ContentProperties,
- properties_bin = ContentPropertiesBin,
- payload_fragments_rev = PayloadFragmentsRev},
- FrameMax) ->
- {BodySize, ContentFrames} = build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
+build_simple_content_frames(ChannelInt, Content, FrameMax, Protocol) ->
+ #content{class_id = ClassId,
+ properties_bin = ContentPropertiesBin,
+ payload_fragments_rev = PayloadFragmentsRev} =
+ ensure_content_encoded(Content, Protocol),
+ {BodySize, ContentFrames} =
+ build_content_frames(PayloadFragmentsRev, FrameMax, ChannelInt),
HeaderFrame = create_frame(2, ChannelInt,
[<<ClassId:16, 0:16, BodySize:64>>,
- maybe_encode_properties(ContentProperties, ContentPropertiesBin)]),
+ ContentPropertiesBin]),
[HeaderFrame | ContentFrames].
-maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
- when is_binary(ContentPropertiesBin) ->
- ContentPropertiesBin;
-maybe_encode_properties(ContentProperties, none) ->
- rabbit_framing:encode_properties(ContentProperties).
-
build_content_frames(FragsRev, FrameMax, ChannelInt) ->
BodyPayloadMax = if FrameMax == 0 ->
iolist_size(FragsRev);
@@ -283,13 +279,16 @@ check_empty_content_body_frame_size() ->
ComputedSize, ?EMPTY_CONTENT_BODY_FRAME_SIZE})
end.
-ensure_content_encoded(Content = #content{properties_bin = PropsBin})
+ensure_content_encoded(Content = #content{properties_bin = PropsBin,
+ protocol = Protocol}, Protocol)
when PropsBin =/= 'none' ->
Content;
-ensure_content_encoded(Content = #content{properties = Props}) ->
- Content #content{properties_bin = rabbit_framing:encode_properties(Props)}.
+ensure_content_encoded(Content = #content{properties = Props}, Protocol) ->
+ Content#content{properties_bin = Protocol:encode_properties(Props),
+ protocol = Protocol}.
-clear_encoded_content(Content = #content{properties_bin = none}) ->
+clear_encoded_content(Content = #content{properties_bin = none,
+ protocol = none}) ->
Content;
clear_encoded_content(Content = #content{properties = none}) ->
%% Only clear when we can rebuild the properties_bin later in
@@ -297,4 +296,4 @@ clear_encoded_content(Content = #content{properties = none}) ->
%% one of properties and properties_bin can be 'none'
Content;
clear_encoded_content(Content = #content{}) ->
- Content#content{properties_bin = none}.
+ Content#content{properties_bin = none, protocol = none}.
diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl
index 69e34440b8..1d0a62afe8 100644
--- a/src/rabbit_binary_parser.erl
+++ b/src/rabbit_binary_parser.erl
@@ -34,7 +34,7 @@
-include("rabbit.hrl").
-export([parse_table/1, parse_properties/2]).
--export([ensure_content_decoded/1, clear_decoded_content/1]).
+-export([ensure_content_decoded/2, clear_decoded_content/1]).
-import(lists).
@@ -45,8 +45,9 @@
-spec(parse_table/1 :: (binary()) -> rabbit_framing:amqp_table()).
-spec(parse_properties/2 ::
([rabbit_framing:amqp_property_type()], binary()) -> [any()]).
--spec(ensure_content_decoded/1 ::
- (rabbit_types:content()) -> rabbit_types:decoded_content()).
+-spec(ensure_content_decoded/2 ::
+ (rabbit_types:content(), rabbit_types:protocol())
+ -> rabbit_types:decoded_content()).
-spec(clear_decoded_content/1 ::
(rabbit_types:content()) -> rabbit_types:undecoded_content()).
@@ -162,12 +163,12 @@ parse_property(bit, Rest) ->
parse_property(table, <<Len:32/unsigned, Table:Len/binary, Rest/binary>>) ->
{parse_table(Table), Rest}.
-ensure_content_decoded(Content = #content{properties = Props})
+ensure_content_decoded(Content = #content{properties = Props}, _Protocol)
when Props =/= 'none' ->
Content;
-ensure_content_decoded(Content = #content{properties_bin = PropBin})
+ensure_content_decoded(Content = #content{properties_bin = PropBin}, Protocol)
when is_binary(PropBin) ->
- Content#content{properties = rabbit_framing:decode_properties(
+ Content#content{properties = Protocol:decode_properties(
Content#content.class_id, PropBin)}.
clear_decoded_content(Content = #content{properties = none}) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 58cb82f838..7c800f41ac 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -431,7 +431,8 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
%% We decode the content's properties here because we're almost
%% certain to want to look at delivery-mode and priority.
- DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content),
+ DecodedContent = rabbit_binary_parser:ensure_content_decoded(
+ Content, rabbit_framing_amqp_0_9_1),
IsPersistent = is_message_persistent(DecodedContent),
Message = #basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
@@ -648,6 +649,17 @@ handle_method(#'basic.recover'{requeue = Requeue}, Content, State) ->
ok = rabbit_writer:send_command(WriterPid, #'basic.recover_ok'{}),
{noreply, State2};
+handle_method(#'basic.reject'{delivery_tag = DeliveryTag,
+ requeue = Requeue},
+ _, State = #ch{ unacked_message_q = UAMQ}) ->
+ {Acked, Remaining} = collect_acks(UAMQ, DeliveryTag, false),
+ ok = fold_per_queue(
+ fun (QPid, MsgIds, ok) ->
+ rabbit_amqqueue:reject(QPid, MsgIds, Requeue, self())
+ end, ok, Acked),
+ ok = notify_limiter(State#ch.limiter_pid, Acked),
+ {noreply, State#ch{unacked_message_q = Remaining}};
+
handle_method(#'exchange.declare'{exchange = ExchangeNameBin,
type = TypeNameBin,
passive = false,
@@ -948,7 +960,7 @@ basic_return(#basic_message{exchange_name = ExchangeName,
content = Content},
WriterPid, Reason) ->
{_Close, ReplyCode, ReplyText} =
- rabbit_framing:lookup_amqp_exception(Reason),
+ rabbit_framing_amqp_0_9_1:lookup_amqp_exception(Reason),
ok = rabbit_writer:send_command(
WriterPid,
#'basic.return'{reply_code = ReplyCode,
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index eb7fab0de2..1d02d992d2 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -33,7 +33,7 @@
-behaviour(supervisor2).
--export([start_link/7, writer/1, framing_channel/1, channel/1]).
+-export([start_link/8, writer/1, framing_channel/1, channel/1]).
-export([init/1]).
@@ -43,39 +43,41 @@
-ifdef(use_specs).
--spec(start_link/7 ::
- (rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), pid(), rabbit_access_control:username(),
- rabbit_types:vhost(), pid()) ->
+-spec(start_link/8 ::
+ (rabbit_types:protocol(), rabbit_net:socket(),
+ rabbit_channel:channel_number(), non_neg_integer(), pid(),
+ rabbit_access_control:username(), rabbit_types:vhost(), pid()) ->
ignore | rabbit_types:ok_or_error2(pid(), any())).
-endif.
%%----------------------------------------------------------------------------
-start_link(Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector) ->
- supervisor2:start_link(?MODULE, [Sock, Channel, FrameMax, ReaderPid,
- Username, VHost, Collector]).
+start_link(Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector) ->
+ supervisor2:start_link(?MODULE, [Protocol, Sock, Channel, FrameMax,
+ ReaderPid, Username, VHost, Collector]).
writer(Pid) ->
- hd(supervisor2:find_child(Pid, writer, worker, [rabbit_writer])).
+ hd(supervisor2:find_child(Pid, writer)).
channel(Pid) ->
- hd(supervisor2:find_child(Pid, channel, worker, [rabbit_channel])).
+ hd(supervisor2:find_child(Pid, channel)).
framing_channel(Pid) ->
- hd(supervisor2:find_child(Pid, framing_channel, worker,
- [rabbit_framing_channel])).
+ hd(supervisor2:find_child(Pid, framing_channel)).
%%----------------------------------------------------------------------------
-init([Sock, Channel, FrameMax, ReaderPid, Username, VHost, Collector]) ->
+init([Protocol, Sock, Channel, FrameMax, ReaderPid, Username, VHost,
+ Collector]) ->
Me = self(),
{ok, {{one_for_all, 0, 1},
[{framing_channel, {rabbit_framing_channel, start_link,
- [fun () -> channel(Me) end]},
+ [fun () -> channel(Me) end, Protocol]},
permanent, ?MAX_WAIT, worker, [rabbit_framing_channel]},
- {writer, {rabbit_writer, start_link, [Sock, Channel, FrameMax]},
+ {writer, {rabbit_writer, start_link,
+ [Sock, Channel, FrameMax, Protocol]},
permanent, ?MAX_WAIT, worker, [rabbit_writer]},
{channel, {rabbit_channel, start_link,
[Channel, fun () -> ReaderPid end,
diff --git a/src/rabbit_channel_sup_sup.erl b/src/rabbit_channel_sup_sup.erl
index 2fab867841..f608b724ac 100644
--- a/src/rabbit_channel_sup_sup.erl
+++ b/src/rabbit_channel_sup_sup.erl
@@ -33,16 +33,16 @@
-behaviour(supervisor2).
--export([start_link/0, start_channel/2]).
+-export([start_link/1, start_channel/2]).
-export([init/1]).
-start_link() ->
- supervisor2:start_link(?MODULE, []).
+start_link(Protocol) ->
+ supervisor2:start_link(?MODULE, [Protocol]).
-init([]) ->
+init([Protocol]) ->
{ok, {{simple_one_for_one_terminate, 0, 1},
- [{channel_sup, {rabbit_channel_sup, start_link, []},
+ [{channel_sup, {rabbit_channel_sup, start_link, [Protocol]},
temporary, infinity, supervisor, [rabbit_channel_sup]}]}}.
start_channel(Pid, Args) ->
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 3dd81ef6c5..5d05ca28ff 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -47,15 +47,12 @@ init([]) ->
[{reader, {rabbit_reader, start_link, []},
permanent, ?MAX_WAIT, worker, [rabbit_reader]},
{collector, {rabbit_queue_collector, start_link, []},
- permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]},
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
- permanent, infinity, supervisor, [rabbit_channel_sup_sup]}
+ permanent, ?MAX_WAIT, worker, [rabbit_queue_collector]}
]}}.
reader(Pid) ->
- hd(supervisor2:find_child(Pid, reader, worker, [rabbit_reader])).
+ hd(supervisor2:find_child(Pid, reader)).
channel_sup_sup(Pid) ->
- hd(supervisor2:find_child(Pid, channel_sup_sup, supervisor,
- [rabbit_channel_sup_sup])).
+ hd(supervisor2:find_child(Pid, channel_sup_sup)).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 7f7622b255..49f87a22a6 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -378,7 +378,6 @@ cleanup_deleted_queue_bindings1(ExchangeName, Bindings) ->
[X] = mnesia:read({rabbit_exchange, ExchangeName}),
{maybe_auto_delete(X), Bindings}.
-
delete_forward_routes(Route) ->
ok = mnesia:delete_object(rabbit_route, Route, write),
ok = mnesia:delete_object(rabbit_durable_route, Route, write).
diff --git a/src/rabbit_framing_channel.erl b/src/rabbit_framing_channel.erl
index 3a0d71f5f3..6ece343655 100644
--- a/src/rabbit_framing_channel.erl
+++ b/src/rabbit_framing_channel.erl
@@ -32,15 +32,16 @@
-module(rabbit_framing_channel).
-include("rabbit.hrl").
--export([start_link/1, process/2, shutdown/1]).
+-export([start_link/2, process/2, shutdown/1]).
%% internal
--export([mainloop/1]).
+-export([mainloop/2]).
%%--------------------------------------------------------------------
-start_link(GetChannelPid) ->
- {ok, proc_lib:spawn_link(fun () -> mainloop(GetChannelPid()) end)}.
+start_link(GetChannelPid, Protocol) ->
+ {ok, proc_lib:spawn_link(
+ fun () -> mainloop(GetChannelPid(), Protocol) end)}.
process(Pid, Frame) ->
Pid ! {frame, Frame},
@@ -60,37 +61,42 @@ read_frame(ChannelPid) ->
Msg -> exit({unexpected_message, Msg})
end.
-mainloop(ChannelPid) ->
- {method, MethodName, FieldsBin} = read_frame(ChannelPid),
- Method = rabbit_framing:decode_method_fields(MethodName, FieldsBin),
- case rabbit_framing:method_has_content(MethodName) of
- true -> {ClassId, _MethodId} = rabbit_framing:method_id(MethodName),
- rabbit_channel:do(ChannelPid, Method,
- collect_content(ChannelPid, ClassId));
- false -> rabbit_channel:do(ChannelPid, Method)
- end,
- ?MODULE:mainloop(ChannelPid).
+mainloop(ChannelPid, Protocol) ->
+ case read_frame(ChannelPid) of
+ {method, MethodName, FieldsBin} ->
+ Method = Protocol:decode_method_fields(MethodName, FieldsBin),
+ case Protocol:method_has_content(MethodName) of
+ true -> {ClassId, _MethodId} = Protocol:method_id(MethodName),
+ rabbit_channel:do(ChannelPid, Method,
+ collect_content(ChannelPid,
+ ClassId,
+ Protocol));
+ false -> rabbit_channel:do(ChannelPid, Method)
+ end,
+ ?MODULE:mainloop(ChannelPid, Protocol);
+ _ ->
+ unexpected_frame("expected method frame, "
+ "got non method frame instead",
+ [])
+ end.
-collect_content(ChannelPid, ClassId) ->
+collect_content(ChannelPid, ClassId, Protocol) ->
case read_frame(ChannelPid) of
{content_header, ClassId, 0, BodySize, PropertiesBin} ->
Payload = collect_content_payload(ChannelPid, BodySize, []),
#content{class_id = ClassId,
properties = none,
properties_bin = PropertiesBin,
+ protocol = Protocol,
payload_fragments_rev = Payload};
{content_header, HeaderClassId, 0, _BodySize, _PropertiesBin} ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got one for class ~w instead",
- [ClassId, HeaderClassId]);
+ unexpected_frame("expected content header for class ~w, "
+ "got one for class ~w instead",
+ [ClassId, HeaderClassId]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content header for class ~w, "
- "got non content header frame instead",
- [ClassId])
+ unexpected_frame("expected content header for class ~w, "
+ "got non content header frame instead",
+ [ClassId])
end.
collect_content_payload(_ChannelPid, 0, Acc) ->
@@ -102,8 +108,10 @@ collect_content_payload(ChannelPid, RemainingByteCount, Acc) ->
RemainingByteCount - size(FragmentBin),
[FragmentBin | Acc]);
_ ->
- rabbit_misc:protocol_error(
- command_invalid,
- "expected content body, got non content body frame instead",
- [])
+ unexpected_frame("expected content body, "
+ "got non content body frame instead",
+ [])
end.
+
+unexpected_frame(ExplanationFormat, Params) ->
+ rabbit_misc:protocol_error(unexpected_frame, ExplanationFormat, Params).
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index b7c73ae7a6..e8e3236a16 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -31,83 +31,94 @@
-module(rabbit_heartbeat).
--include("rabbit.hrl").
-
-export([start_heartbeat/3,
start_heartbeat_sender/2,
start_heartbeat_receiver/2]).
+-include("rabbit.hrl").
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(start_heartbeat/3 :: (pid(), rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:maybe({pid(), pid()})).
+-spec(start_heartbeat_sender/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/2 :: (rabbit_net:socket(), non_neg_integer()) ->
+ rabbit_types:ok(pid())).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
start_heartbeat(_Sup, _Sock, 0) ->
none;
start_heartbeat(Sup, Sock, TimeoutSec) ->
- {ok, _Sender} =
+ {ok, Sender} =
supervisor:start_child(
Sup, {heartbeat_sender,
{?MODULE, start_heartbeat_sender, [Sock, TimeoutSec]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- {ok, _Receiver} =
+ {ok, Receiver} =
supervisor:start_child(
Sup, {heartbeat_receiver,
{?MODULE, start_heartbeat_receiver, [Sock, TimeoutSec]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}),
- ok.
+ {Sender, Receiver}.
start_heartbeat_sender(Sock, TimeoutSec) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
+ Parent = self(),
{ok, proc_lib:spawn_link(
- fun () -> heartbeater(Sock, TimeoutSec * 1000 div 2,
+ fun () -> heartbeater({Sock, TimeoutSec * 1000 div 2,
send_oct, 0,
fun () ->
catch rabbit_net:send(Sock, rabbit_binary_generator:build_heartbeat_frame()),
continue
- end)
+ end}, Parent)
end)}.
start_heartbeat_receiver(Sock, TimeoutSec) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
+ Parent = self(),
{ok, proc_lib:spawn_link(
- fun () -> heartbeater(Sock, TimeoutSec * 1000,
+ fun () -> heartbeater({Sock, TimeoutSec * 1000,
recv_oct, 1,
- fun () -> exit(timeout) end)
- end)}.
+ fun () -> exit(timeout) end}, Parent) end)}.
-%% Y-combinator, posted by Vladimir Sekissov to the Erlang mailing list
-%% http://www.erlang.org/ml-archive/erlang-questions/200301/msg00053.html
-y(X) ->
- F = fun (P) -> X(fun (A) -> (P(P))(A) end) end,
- F(F).
+heartbeater(Params, Parent) ->
+ heartbeater(Params, erlang:monitor(process, Parent), {0, 0}).
-heartbeater(Sock, TimeoutMillisec, StatName, Threshold, Handler) ->
- Heartbeat =
- fun (F) ->
- fun ({StatVal, SameCount}) ->
- receive
- Other -> exit({unexpected_message, Other})
- after TimeoutMillisec ->
- case rabbit_net:getstat(Sock, [StatName]) of
- {ok, [{StatName, NewStatVal}]} ->
- if NewStatVal =/= StatVal ->
- F({NewStatVal, 0});
- SameCount < Threshold ->
- F({NewStatVal, SameCount + 1});
- true ->
- continue = Handler(),
- F({NewStatVal, 0})
- end;
- {error, einval} ->
- %% the socket is dead, most
- %% likely because the
- %% connection is being shut
- %% down -> terminate
- ok;
- {error, Reason} ->
- exit({cannot_get_socket_stats, Reason})
- end
- end
- end
- end,
- (y(Heartbeat))({0, 0}).
+heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
+ MonitorRef, {StatVal, SameCount}) ->
+ receive
+ {'DOWN', MonitorRef, process, _Object, _Info} ->
+ ok;
+ Other ->
+ exit({unexpected_message, Other})
+ after TimeoutMillisec ->
+ case rabbit_net:getstat(Sock, [StatName]) of
+ {ok, [{StatName, NewStatVal}]} ->
+ Recurse = fun (V) -> heartbeater(Params, MonitorRef, V) end,
+ if NewStatVal =/= StatVal ->
+ Recurse({NewStatVal, 0});
+ SameCount < Threshold ->
+ Recurse({NewStatVal, SameCount + 1});
+ true ->
+ case Handler() of
+ continue -> Recurse({NewStatVal, 0})
+ end
+ end;
+ {error, einval} ->
+ %% the socket is dead, most likely because the
+ %% connection is being shut down -> terminate
+ ok;
+ {error, Reason} ->
+ exit({cannot_get_socket_stats, Reason})
+ end
+ end.
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 491ae7d6eb..813ccc7538 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -45,7 +45,7 @@
-type(maybe_pid() :: pid() | 'undefined').
--spec(start_link/2 :: (pid(), non_neg_integer()) -> {'ok', pid()}).
+-spec(start_link/2 :: (pid(), non_neg_integer()) -> rabbit_types:ok(pid())).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped').
-spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 99d76b8ac4..2dc3e64b5e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -41,7 +41,7 @@
-export([server_properties/0]).
--export([analyze_frame/2]).
+-export([analyze_frame/3]).
-import(gen_tcp).
-import(fprof).
@@ -62,8 +62,8 @@
-define(INFO_KEYS,
[pid, address, port, peer_address, peer_port,
- recv_oct, recv_cnt, send_oct, send_cnt, send_pend,
- state, channels, user, vhost, timeout, frame_max, client_properties]).
+ recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels,
+ protocol, user, vhost, timeout, frame_max, client_properties]).
%% connection lifecycle
%%
@@ -248,14 +248,13 @@ start_connection(Parent, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
ProfilingValue = setup_profiling(),
- [Collector] =
- [Pid || {collector, Pid, worker, [rabbit_queue_collector]}
- <- supervisor:which_children(Parent)],
+ [Collector] = supervisor2:find_child(Parent, collector),
try
mainloop(Deb, switch_callback(
#v1{parent = Parent,
sock = ClientSock,
connection = #connection{
+ protocol = none,
user = none,
timeout_sec = ?HANDSHAKE_TIMEOUT,
frame_max = ?FRAME_MIN_SIZE,
@@ -444,7 +443,9 @@ wait_for_channel_termination(N, TimerRef) ->
end.
maybe_close(State = #v1{connection_state = closing,
- queue_collector = Collector}) ->
+ queue_collector = Collector,
+ connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
case all_channels() of
[] ->
%% Spec says "Exclusive queues may only be accessed by the current
@@ -452,16 +453,18 @@ maybe_close(State = #v1{connection_state = closing,
%% This does not strictly imply synchrony, but in practice it seems
%% to be what people assume.
rabbit_queue_collector:delete_all(Collector),
- ok = send_on_channel0(State#v1.sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
close_connection(State);
_ -> State
end;
maybe_close(State) ->
State.
-handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
+handle_frame(Type, 0, Payload,
+ State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol}})
when CS =:= closing; CS =:= closed ->
- case analyze_frame(Type, Payload) of
+ case analyze_frame(Type, Payload, Protocol) of
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
_Other -> State
@@ -469,20 +472,20 @@ handle_frame(Type, 0, Payload, State = #v1{connection_state = CS})
handle_frame(_Type, _Channel, _Payload, State = #v1{connection_state = CS})
when CS =:= closing; CS =:= closed ->
State;
-handle_frame(Type, 0, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, 0, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, 0, Type, Payload});
heartbeat -> State;
- trace -> State;
{method, MethodName, FieldsBin} ->
handle_method0(MethodName, FieldsBin, State);
Other -> throw({unexpected_frame_on_channel0, Other})
end;
-handle_frame(Type, Channel, Payload, State) ->
- case analyze_frame(Type, Payload) of
+handle_frame(Type, Channel, Payload,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
+ case analyze_frame(Type, Payload, Protocol) of
error -> throw({unknown_frame, Channel, Type, Payload});
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
- trace -> throw({unexpected_trace_frame, Channel});
AnalyzedFrame ->
%%?LOGDEBUG("Ch ~p Frame ~p~n", [Channel, AnalyzedFrame]),
case get({channel, Channel}) of
@@ -523,17 +526,20 @@ handle_frame(Type, Channel, Payload, State) ->
end
end.
-analyze_frame(?FRAME_METHOD, <<ClassId:16, MethodId:16, MethodFields/binary>>) ->
- {method, rabbit_framing:lookup_method_name({ClassId, MethodId}), MethodFields};
-analyze_frame(?FRAME_HEADER, <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>) ->
+analyze_frame(?FRAME_METHOD,
+ <<ClassId:16, MethodId:16, MethodFields/binary>>,
+ Protocol) ->
+ MethodName = Protocol:lookup_method_name({ClassId, MethodId}),
+ {method, MethodName, MethodFields};
+analyze_frame(?FRAME_HEADER,
+ <<ClassId:16, Weight:16, BodySize:64, Properties/binary>>,
+ _Protocol) ->
{content_header, ClassId, Weight, BodySize, Properties};
-analyze_frame(?FRAME_BODY, Body) ->
+analyze_frame(?FRAME_BODY, Body, _Protocol) ->
{content_body, Body};
-analyze_frame(?FRAME_TRACE, _Body) ->
- trace;
-analyze_frame(?FRAME_HEARTBEAT, <<>>) ->
+analyze_frame(?FRAME_HEARTBEAT, <<>>, _Protocol) ->
heartbeat;
-analyze_frame(_Type, _Body) ->
+analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
@@ -550,54 +556,75 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, Stat
throw({bad_payload, PayloadAndMarker})
end;
-handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
- State = #v1{sock = Sock, connection = Connection}) ->
- case check_version({ProtocolMajor, ProtocolMinor},
- {?PROTOCOL_VERSION_MAJOR, ?PROTOCOL_VERSION_MINOR}) of
- true ->
- ok = send_on_channel0(
- Sock,
- #'connection.start'{
- version_major = ?PROTOCOL_VERSION_MAJOR,
- version_minor = ?PROTOCOL_VERSION_MINOR,
- server_properties = server_properties(),
- mechanisms = <<"PLAIN AMQPLAIN">>,
- locales = <<"en_US">> }),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT},
- connection_state = starting},
- frame_header, 7};
- false ->
- throw({bad_version, ProtocolMajor, ProtocolMinor})
- end;
+%% The two rules pertaining to version negotiation:
+%%
+%% * If the server cannot support the protocol specified in the
+%% protocol header, it MUST respond with a valid protocol header and
+%% then close the socket connection.
+%%
+%% * The server MUST provide a protocol version that is lower than or
+%% equal to that requested by the client in the protocol header.
+handle_input(handshake, <<"AMQP", 0, 0, 9, 1>>, State) ->
+ start_connection({0, 9, 1}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is the protocol header for 0-9, which we can safely treat as
+%% though it were 0-9-1.
+handle_input(handshake, <<"AMQP", 1, 1, 0, 9>>, State) ->
+ start_connection({0, 9, 0}, rabbit_framing_amqp_0_9_1, State);
+
+%% This is what most clients send for 0-8. The 0-8 spec, confusingly,
+%% defines the version as 8-0.
+handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+%% The 0-8 spec as on the AMQP web site actually has this as the
+%% protocol header; some libraries e.g., py-amqplib, send it when they
+%% want 0-8.
+handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
+ start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+
+handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
+ refuse_connection(Sock, {bad_version, A, B, C, D});
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = inet_op(fun () -> rabbit_net:send(
- Sock, <<"AMQP",1,1,
- ?PROTOCOL_VERSION_MAJOR,
- ?PROTOCOL_VERSION_MINOR>>) end),
- throw({bad_header, Other});
+ refuse_connection(Sock, {bad_header, Other});
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
-%% the 0-8 spec, confusingly, defines the version as 8-0
-adjust_version({8,0}) -> {0,8};
-adjust_version(Version) -> Version.
-check_version(ClientVersion, ServerVersion) ->
- {ClientMajor, ClientMinor} = adjust_version(ClientVersion),
- {ServerMajor, ServerMinor} = adjust_version(ServerVersion),
- ClientMajor > ServerMajor
- orelse
- (ClientMajor == ServerMajor andalso
- ClientMinor >= ServerMinor).
+%% Offer a protocol version to the client. Connection.start only
+%% includes a major and minor version number, Luckily 0-9 and 0-9-1
+%% are similar enough that clients will be happy with either.
+start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
+ Protocol, State = #v1{parent = Parent, sock = Sock,
+ connection = Connection}) ->
+ {ok, _Pid} =
+ supervisor:start_child(
+ Parent, {channel_sup_sup,
+ {rabbit_channel_sup_sup, start_link, [Protocol]},
+ permanent, infinity, supervisor, [rabbit_channel_sup_sup]}),
+ Start = #'connection.start'{ version_major = ProtocolMajor,
+ version_minor = ProtocolMinor,
+ server_properties = server_properties(),
+ mechanisms = <<"PLAIN AMQPLAIN">>,
+ locales = <<"en_US">> },
+ ok = send_on_channel0(Sock, Start, Protocol),
+ {State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7}.
+
+refuse_connection(Sock, Exception) ->
+ ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),
+ throw(Exception).
%%--------------------------------------------------------------------------
-handle_method0(MethodName, FieldsBin, State) ->
+handle_method0(MethodName, FieldsBin,
+ State = #v1{connection = #connection{protocol = Protocol}}) ->
try
- handle_method0(rabbit_framing:decode_method_fields(
- MethodName, FieldsBin),
+ handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin),
State)
catch exit:Reason ->
CompleteReason = case Reason of
@@ -619,14 +646,14 @@ handle_method0(#'connection.start_ok'{mechanism = Mechanism,
response = Response,
client_properties = ClientProperties},
State = #v1{connection_state = starting,
- connection = Connection,
+ connection = Connection =
+ #connection{protocol = Protocol},
sock = Sock}) ->
User = rabbit_access_control:check_login(Mechanism, Response),
- ok = send_on_channel0(
- Sock,
- #'connection.tune'{channel_max = 0,
+ Tune = #'connection.tune'{channel_max = 0,
frame_max = ?FRAME_MAX,
- heartbeat = 0}),
+ heartbeat = 0},
+ ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{
user = User,
@@ -653,46 +680,30 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
frame_max = FrameMax}}
end;
-handle_method0(#'connection.open'{virtual_host = VHostPath,
- insist = Insist},
+handle_method0(#'connection.open'{virtual_host = VHostPath},
+
State = #v1{connection_state = opening,
connection = Connection = #connection{
- user = User},
+ user = User,
+ protocol = Protocol},
sock = Sock}) ->
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
- KnownHosts = format_listeners(rabbit_networking:active_listeners()),
- Redirects = compute_redirects(Insist),
- if Redirects == [] ->
- ok = send_on_channel0(
- Sock,
- #'connection.open_ok'{known_hosts = KnownHosts}),
- State#v1{connection_state = running,
- connection = NewConnection};
- true ->
- %% FIXME: 'host' is supposed to only contain one
- %% address; but which one do we pick? This is
- %% really a problem with the spec.
- Host = format_listeners(Redirects),
- rabbit_log:info("connection ~p redirecting to ~p~n",
- [self(), Host]),
- ok = send_on_channel0(
- Sock,
- #'connection.redirect'{host = Host,
- known_hosts = KnownHosts}),
- close_connection(State#v1{connection = NewConnection})
- end;
+ ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
+ State#v1{connection_state = running,
+ connection = NewConnection};
handle_method0(#'connection.close'{},
State = #v1{connection_state = running}) ->
lists:foreach(fun rabbit_framing_channel:shutdown/1, all_channels()),
maybe_close(State#v1{connection_state = closing});
handle_method0(#'connection.close'{},
State = #v1{connection_state = CS,
+ connection = #connection{protocol = Protocol},
sock = Sock})
when CS =:= closing; CS =:= closed ->
%% We're already closed or closing, so we don't need to cleanup
%% anything.
- ok = send_on_channel0(Sock, #'connection.close_ok'{}),
+ ok = send_on_channel0(Sock, #'connection.close_ok'{}, Protocol),
State;
handle_method0(#'connection.close_ok'{},
State = #v1{connection_state = closed}) ->
@@ -705,23 +716,8 @@ handle_method0(_Method, #v1{connection_state = S}) ->
rabbit_misc:protocol_error(
channel_error, "unexpected method in connection state ~w", [S]).
-send_on_channel0(Sock, Method) ->
- ok = rabbit_writer:internal_send_command(Sock, 0, Method).
-
-format_listeners(Listeners) ->
- list_to_binary(
- rabbit_misc:intersperse(
- $,,
- [io_lib:format("~s:~w", [Host, Port]) ||
- #listener{host = Host, port = Port} <- Listeners])).
-
-compute_redirects(true) -> [];
-compute_redirects(false) ->
- Node = node(),
- LNode = rabbit_load:pick(),
- if Node == LNode -> [];
- true -> rabbit_networking:node_listeners(LNode)
- end.
+send_on_channel0(Sock, Method, Protocol) ->
+ ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
%%--------------------------------------------------------------------------
@@ -755,6 +751,10 @@ i(state, #v1{connection_state = S}) ->
S;
i(channels, #v1{}) ->
length(all_channels());
+i(protocol, #v1{connection = #connection{protocol = none}}) ->
+ none;
+i(protocol, #v1{connection = #connection{protocol = Protocol}}) ->
+ Protocol:version();
i(user, #v1{connection = #connection{user = #user{username = Username}}}) ->
Username;
i(user, #v1{connection = #connection{user = none}}) ->
@@ -801,25 +801,27 @@ handle_exception(State = #v1{connection_state = CS}, Channel, Reason) ->
log_channel_error(CS, Channel, Reason),
send_exception(State, Channel, Reason).
-send_exception(State, Channel, Reason) ->
- {ShouldClose, CloseChannel, CloseMethod} = map_exception(Channel, Reason),
+send_exception(State = #v1{connection = #connection{protocol = Protocol}},
+ Channel, Reason) ->
+ {ShouldClose, CloseChannel, CloseMethod} =
+ map_exception(Channel, Reason, Protocol),
NewState = case ShouldClose of
true -> terminate_channels(),
close_connection(State);
false -> close_channel(Channel, State)
end,
ok = rabbit_writer:internal_send_command(
- NewState#v1.sock, CloseChannel, CloseMethod),
+ NewState#v1.sock, CloseChannel, CloseMethod, Protocol),
NewState.
-map_exception(Channel, Reason) ->
+map_exception(Channel, Reason, Protocol) ->
{SuggestedClose, ReplyCode, ReplyText, FailedMethod} =
- lookup_amqp_exception(Reason),
+ lookup_amqp_exception(Reason, Protocol),
ShouldClose = SuggestedClose or (Channel == 0),
{ClassId, MethodId} = case FailedMethod of
{_, _} -> FailedMethod;
none -> {0, 0};
- _ -> rabbit_framing:method_id(FailedMethod)
+ _ -> Protocol:method_id(FailedMethod)
end,
{CloseChannel, CloseMethod} =
case ShouldClose of
@@ -834,22 +836,16 @@ map_exception(Channel, Reason) ->
end,
{ShouldClose, CloseChannel, CloseMethod}.
-%% FIXME: this clause can go when we move to AMQP spec >=8.1
-lookup_amqp_exception(#amqp_error{name = precondition_failed,
- explanation = Expl,
- method = Method}) ->
- ExplBin = amqp_exception_explanation(<<"PRECONDITION_FAILED">>, Expl),
- {false, 406, ExplBin, Method};
lookup_amqp_exception(#amqp_error{name = Name,
explanation = Expl,
- method = Method}) ->
- {ShouldClose, Code, Text} = rabbit_framing:lookup_amqp_exception(Name),
+ method = Method},
+ Protocol) ->
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(Name),
ExplBin = amqp_exception_explanation(Text, Expl),
{ShouldClose, Code, ExplBin, Method};
-lookup_amqp_exception(Other) ->
+lookup_amqp_exception(Other, Protocol) ->
rabbit_log:warning("Non-AMQP exit reason '~p'~n", [Other]),
- {ShouldClose, Code, Text} =
- rabbit_framing:lookup_amqp_exception(internal_error),
+ {ShouldClose, Code, Text} = Protocol:lookup_amqp_exception(internal_error),
{ShouldClose, Code, Text, none}.
amqp_exception_explanation(Text, Expl) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index b46df02de7..06fcb691d6 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -504,8 +504,10 @@ test_content_framing(FrameMax, BodyBin) ->
rabbit_binary_generator:build_simple_content_frames(
1,
rabbit_binary_generator:ensure_content_encoded(
- rabbit_basic:build_content(#'P_basic'{}, BodyBin)),
- FrameMax),
+ rabbit_basic:build_content(#'P_basic'{}, BodyBin),
+ rabbit_framing_amqp_0_9_1),
+ FrameMax,
+ rabbit_framing_amqp_0_9_1),
%% header is formatted correctly and the size is the total of the
%% fragments
<<_FrameHeader:7/binary, _ClassAndWeight:4/binary,
@@ -935,11 +937,14 @@ test_user_management() ->
passed.
+make_fun(Result) ->
+ fun () -> Result end.
+
test_server_status() ->
%% create a few things so there is some useful information to list
Writer = spawn(fun () -> receive shutdown -> ok end end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, make_fun(self()), make_fun(Writer),
+ <<"user">>, <<"/">>, self()),
[Q, Q2] = [Queue || Name <- [<<"foo">>, <<"bar">>],
{new, Queue = #amqqueue{}} <-
[rabbit_amqqueue:declare(
@@ -1074,8 +1079,8 @@ test_memory_pressure_flush(Writer) ->
test_memory_pressure_spawn() ->
Me = self(),
Writer = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch} = rabbit_channel:start_link(1, self(), Writer, <<"user">>, <<"/">>,
- self()),
+ {ok, Ch} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer),
+ <<"user">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch, #'channel.open'{}),
receive #'channel.open_ok'{} -> ok
after 1000 -> throw(failed_to_receive_channel_open_ok)
@@ -1148,8 +1153,8 @@ test_memory_pressure() ->
alarm_handler:set_alarm({vm_memory_high_watermark, []}),
Me = self(),
Writer4 = spawn(fun () -> test_memory_pressure_receiver(Me) end),
- {ok, Ch4} = rabbit_channel:start_link(1, self(), Writer4, <<"user">>,
- <<"/">>, self()),
+ {ok, Ch4} = rabbit_channel:start_link(1, make_fun(Me), make_fun(Writer4),
+ <<"user">>, <<"/">>, self()),
ok = rabbit_channel:do(Ch4, #'channel.open'{}),
ok = test_memory_pressure_flush(Writer4),
receive #'channel.open_ok'{} -> throw(unexpected_channel_open_ok)
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2e492b80bd..3aaf1917fe 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -39,8 +39,8 @@
delivery/0, content/0, decoded_content/0, undecoded_content/0,
unencoded_content/0, encoded_content/0, vhost/0, ctag/0,
amqp_error/0, r/1, r2/2, r3/3, ssl_socket/0, listener/0,
- binding/0, amqqueue/0, exchange/0, connection/0, user/0,
- error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
+ binding/0, amqqueue/0, exchange/0, connection/0, protocol/0,
+ user/0, error/1, ok_or_error/1, ok_or_error2/2, ok/1]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -133,6 +133,8 @@
-type(connection() :: pid()).
+-type(protocol() :: atom()).
+
-type(user() ::
#user{username :: rabbit_access_control:username(),
password :: rabbit_access_control:password()}).
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 16436bc00a..483b46f700 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -33,14 +33,14 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/3, start_link/3, flush/1, mainloop/1]).
+-export([start/4, start_link/4, flush/1, mainloop/1]).
-export([send_command/2, send_command/3, send_command_and_signal_back/3,
send_command_and_signal_back/4, send_command_and_notify/5]).
--export([internal_send_command/3, internal_send_command/5]).
+-export([internal_send_command/4, internal_send_command/6]).
-import(gen_tcp).
--record(wstate, {sock, channel, frame_max}).
+-record(wstate, {sock, channel, frame_max, protocol}).
-define(HIBERNATE_AFTER, 5000).
@@ -48,12 +48,14 @@
-ifdef(use_specs).
--spec(start/3 ::
+-spec(start/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer()) -> rabbit_types:ok(pid())).
--spec(start_link/3 ::
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
+-spec(start_link/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer()) -> rabbit_types:ok(pid())).
+ non_neg_integer(), rabbit_types:protocol())
+ -> rabbit_types:ok(pid())).
-spec(send_command/2 ::
(pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(send_command/3 ::
@@ -68,14 +70,14 @@
(pid(), pid(), pid(), rabbit_framing:amqp_method_record(),
rabbit_types:content())
-> 'ok').
--spec(internal_send_command/3 ::
+-spec(internal_send_command/4 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- rabbit_framing:amqp_method_record())
+ rabbit_framing:amqp_method_record(), rabbit_types:protocol())
-> 'ok').
--spec(internal_send_command/5 ::
+-spec(internal_send_command/6 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
rabbit_framing:amqp_method_record(), rabbit_types:content(),
- non_neg_integer())
+ non_neg_integer(), rabbit_types:protocol())
-> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
@@ -83,17 +85,19 @@
%%----------------------------------------------------------------------------
-start(Sock, Channel, FrameMax) ->
+start(Sock, Channel, FrameMax, Protocol) ->
{ok,
proc_lib:spawn(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}])}.
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
-start_link(Sock, Channel, FrameMax) ->
+start_link(Sock, Channel, FrameMax, Protocol) ->
{ok,
proc_lib:spawn_link(?MODULE, mainloop, [#wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}])}.
+ frame_max = FrameMax,
+ protocol = Protocol}])}.
mainloop(State) ->
receive
@@ -103,35 +107,40 @@ mainloop(State) ->
end.
handle_message({send_command, MethodRecord},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
State;
handle_message({send_command, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
State;
handle_message({send_command_and_signal_back, MethodRecord, Parent},
- State = #wstate{sock = Sock, channel = Channel}) ->
- ok = internal_send_command_async(Sock, Channel, MethodRecord),
+ State = #wstate{sock = Sock, channel = Channel,
+ protocol = Protocol}) ->
+ ok = internal_send_command_async(Sock, Channel, MethodRecord, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_signal_back, MethodRecord, Content, Parent},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
Parent ! rabbit_writer_send_command_signal,
State;
handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content},
State = #wstate{sock = Sock,
channel = Channel,
- frame_max = FrameMax}) ->
+ frame_max = FrameMax,
+ protocol = Protocol}) ->
ok = internal_send_command_async(Sock, Channel, MethodRecord,
- Content, FrameMax),
+ Content, FrameMax, Protocol),
rabbit_amqqueue:notify_sent(QPid, ChPid),
State;
handle_message({inet_reply, _, ok}, State) ->
@@ -173,30 +182,32 @@ flush(W) ->
%---------------------------------------------------------------------------
-assemble_frames(Channel, MethodRecord) ->
+assemble_frames(Channel, MethodRecord, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, none),
- rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord).
+ rabbit_binary_generator:build_simple_method_frame(Channel, MethodRecord,
+ Protocol).
-assemble_frames(Channel, MethodRecord, Content, FrameMax) ->
+assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
- true = rabbit_framing:method_has_content(MethodName), % assertion
+ true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
- Channel, MethodRecord),
+ Channel, MethodRecord, Protocol),
ContentFrames = rabbit_binary_generator:build_simple_content_frames(
- Channel, Content, FrameMax),
+ Channel, Content, FrameMax, Protocol),
[MethodFrame | ContentFrames].
tcp_send(Sock, Data) ->
rabbit_misc:throw_on_error(inet_error,
fun () -> rabbit_net:send(Sock, Data) end).
-internal_send_command(Sock, Channel, MethodRecord) ->
- ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord)).
+internal_send_command(Sock, Channel, MethodRecord, Protocol) ->
+ ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)).
-internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)).
+ Content, FrameMax, Protocol)).
%% gen_tcp:send/2 does a selective receive of {inet_reply, Sock,
%% Status} to obtain the result. That is bad when it is called from
@@ -216,13 +227,14 @@ internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax) ->
%% Also note that the port has bounded buffers and port_command blocks
%% when these are full. So the fact that we process the result
%% asynchronously does not impact flow control.
-internal_send_command_async(Sock, Channel, MethodRecord) ->
- true = port_cmd(Sock, assemble_frames(Channel, MethodRecord)),
+internal_send_command_async(Sock, Channel, MethodRecord, Protocol) ->
+ true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)),
ok.
-internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax) ->
+internal_send_command_async(Sock, Channel, MethodRecord, Content, FrameMax,
+ Protocol) ->
true = port_cmd(Sock, assemble_frames(Channel, MethodRecord,
- Content, FrameMax)),
+ Content, FrameMax, Protocol)),
ok.
port_cmd(Sock, Data) ->
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index d716108cd0..d9eb4d5b46 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -62,7 +62,7 @@
-export([start_link/2,start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1, find_child/4,
+ which_children/1, find_child/2,
check_childspecs/1]).
-export([behaviour_info/1]).
@@ -137,9 +137,9 @@ terminate_child(Supervisor, Name) ->
which_children(Supervisor) ->
call(Supervisor, which_children).
-find_child(Supervisor, Name, Type, Modules) ->
+find_child(Supervisor, Name) ->
[Pid || {Name1, Pid, Type1, Modules1} <- which_children(Supervisor),
- Name1 =:= Name, Type1 =:= Type, Modules1 =:= Modules].
+ Name1 =:= Name].
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).