summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl5
-rw-r--r--src/rabbit_amqqueue_process.erl110
-rw-r--r--src/rabbit_channel.erl14
-rw-r--r--src/rabbit_connection_sup.erl9
-rw-r--r--src/rabbit_limiter.erl108
-rw-r--r--src/rabbit_misc.erl51
-rw-r--r--src/rabbit_reader.erl34
-rw-r--r--src/rabbit_tests.erl24
8 files changed, 308 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2477b891fc..1715e848b1 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,6 +33,7 @@
-export([update/2, store_queue/1, policy_changed/2]).
-export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1,
cancel_sync_mirrors/1]).
+-export([inform_limiter/3]).
%% internal
-export([internal_declare/2, internal_delete/1, run_backing_queue/3,
@@ -177,6 +178,7 @@
-spec(sync_mirrors/1 :: (pid()) ->
'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')).
-spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}).
+-spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok').
-endif.
@@ -607,6 +609,9 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring).
sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors).
cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors).
+inform_limiter(ChPid, QPid, Msg) ->
+ delegate:cast(QPid, {inform_limiter, ChPid, Msg}).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e76bf6ea47..1329209175 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -65,9 +65,18 @@
monitor_ref,
acktags,
consumer_count,
+ %% Queue of {ChPid, #consumer{}} for consumers which have
+ %% been blocked for any reason
blocked_consumers,
+ %% List of consumer tags which have individually been
+ %% blocked by the limiter.
+ blocked_ctags,
+ %% The limiter itself
limiter,
+ %% Has the limiter imposed a channel-wide block, either
+ %% because of qos or channel flow?
is_limit_active,
+ %% Internal flow control for queue -> writer
unsent_message_count}).
%%----------------------------------------------------------------------------
@@ -358,6 +367,7 @@ ch_record(ChPid) ->
acktags = queue:new(),
consumer_count = 0,
blocked_consumers = queue:new(),
+ blocked_ctags = [],
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -405,13 +415,6 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
-ch_record_state_transition(OldCR, NewCR) ->
- case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of
- {true, false} -> unblock;
- {false, true} -> block;
- {_, _} -> ok
- end.
-
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
@@ -426,20 +429,38 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun,
+ E = {ChPid,
+ Consumer = #consumer{tag = CTag,
+ ack_required = AckReq}},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
- true -> block_consumer(C, E),
- {false, State};
- false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
- Consumer#consumer.ack_required) of
- false -> block_consumer(C#cr{is_limit_active = true}, E),
- {false, State};
- true -> AC1 = queue:in(E, State#q.active_consumers),
- deliver_msg_to_consumer(
- DeliverFun, Consumer, C,
- State#q{active_consumers = AC1})
- end
+ true ->
+ block_consumer(C, E),
+ {false, State};
+ false ->
+ #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C,
+ case rabbit_limiter:can_cons_send(
+ Limiter, ChPid, CTag, BQ:len(BQS)) of
+ {false, Lim2} ->
+ %% TODO unify with first case?
+ block_consumer(C#cr{limiter = Lim2,
+ blocked_ctags = [CTag | BCTags]}, E),
+ {false, State};
+ {true, Lim2} ->
+ case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of
+ false ->
+ block_consumer(C#cr{is_limit_active = true}, E),
+ {false, State};
+ true ->
+ AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C#cr{limiter = Lim2},
+ State#q{active_consumers = AC1})
+ end
+ end
end.
deliver_msg_to_consumer(DeliverFun,
@@ -597,16 +618,20 @@ possibly_unblock(State, ChPid, Update) ->
not_found ->
State;
C ->
- C1 = Update(C),
- case ch_record_state_transition(C, C1) of
- ok -> update_ch_record(C1),
- State;
- unblock -> #cr{blocked_consumers = Consumers} = C1,
- update_ch_record(
- C1#cr{blocked_consumers = queue:new()}),
- AC1 = queue:join(State#q.active_consumers,
- Consumers),
- run_message_queue(State#q{active_consumers = AC1})
+ C1 = #cr{blocked_ctags = BCTags1} = Update(C),
+ {Blocked, Unblocked} =
+ lists:partition(
+ fun({_ChPid, #consumer{tag = CTag}}) ->
+ is_ch_blocked(C1) orelse lists:member(CTag, BCTags1)
+ end, queue:to_list(C1#cr.blocked_consumers)),
+ case Unblocked of
+ [] -> update_ch_record(C1),
+ State;
+ _ -> update_ch_record(
+ C1#cr{blocked_consumers = queue:from_list(Blocked)}),
+ AC1 = queue:join(State#q.active_consumers,
+ queue:from_list(Unblocked)),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -658,8 +683,6 @@ check_exclusive_access(none, true, State) ->
consumer_count() -> consumer_count(fun (_) -> false end).
-active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
-
consumer_count(Exclude) ->
lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
not Exclude(C)]).
@@ -932,8 +955,8 @@ i(messages, State) ->
messages_unacknowledged]]);
i(consumers, _) ->
consumer_count();
-i(active_consumers, _) ->
- active_consumer_count();
+i(active_consumers, #q{active_consumers = ActiveConsumers}) ->
+ queue:len(ActiveConsumers);
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -1149,9 +1172,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ State1 = #q{active_consumers = AC,
+ backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_msgs(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
+ reply({ok, BQ:len(BQS), queue:len(AC)}, State1);
handle_call({delete, IfUnused, IfEmpty}, From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
@@ -1291,7 +1315,9 @@ handle_cast({limit, ChPid, Limiter}, State) ->
false -> ok
end,
Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
- C#cr{limiter = Limiter, is_limit_active = Limited}
+ C#cr{limiter = rabbit_limiter:copy_queue_state(
+ OldLimiter, Limiter),
+ is_limit_active = Limited}
end));
handle_cast({flush, ChPid}, State) ->
@@ -1324,6 +1350,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ,
noreply(State#q{backing_queue = BQ1,
backing_queue_state = BQS1});
+handle_cast({inform_limiter, ChPid, Msg},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ #cr{limiter = Limiter,
+ blocked_ctags = BCTags} = ch_record(ChPid),
+ {Unblock, Limiter2} =
+ rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg),
+ noreply(possibly_unblock(
+ State, ChPid,
+ fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock,
+ limiter = Limiter2} end));
+
handle_cast(wake_up, State) ->
noreply(State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e3dfc583..c3a5b16df9 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1079,6 +1079,20 @@ handle_method(#'channel.flow'{active = false}, _,
{noreply, State2}
end;
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ count = Count,
+ drain = Drain}, _,
+ State = #ch{consumer_mapping = Consumers}) ->
+ case dict:find(CTag, Consumers) of
+ {ok, Q} -> ok = rabbit_amqqueue:inform_limiter(
+ self(), Q#amqqueue.pid,
+ {basic_credit, CTag, Credit, Count, Drain}),
+ {noreply, State};
+ error -> rabbit_misc:protocol_error(
+ not_allowed, "unknown consumer tag '~s'", [CTag])
+ end;
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl
index 12a532b6fc..f4f3c72feb 100644
--- a/src/rabbit_connection_sup.erl
+++ b/src/rabbit_connection_sup.erl
@@ -42,10 +42,17 @@ start_link() ->
SupPid,
{collector, {rabbit_queue_collector, start_link, []},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}),
+ %% Note that rabbit_amqp1_0_session_sup_sup despite the name can
+ %% mimic rabbit_channel_sup_sup when we handle a 0-9-1 connection
+ %% and the 1.0 plugin is loaded.
+ ChannelSupSupModule = case code:is_loaded(rabbit_amqp1_0_session_sup_sup) of
+ false -> rabbit_channel_sup_sup;
+ _ -> rabbit_amqp1_0_session_sup_sup
+ end,
{ok, ChannelSupSupPid} =
supervisor2:start_child(
SupPid,
- {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
+ {channel_sup_sup, {ChannelSupSupModule, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}),
{ok, ReaderPid} =
supervisor2:start_child(
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b15498ed9..9da1bc6f7c 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -15,19 +15,25 @@
%%
-module(rabbit_limiter).
+-include("rabbit_framing.hrl").
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
+
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_ch_send/3, can_cons_send/4,
+ ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([inform/4]).
+
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled}).
+-record(token, {pid, enabled, q_state}).
-ifdef(use_specs).
@@ -42,7 +48,10 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()).
+%% TODO
+%% -spec(can_send/5 :: (token(), pid(), boolean(),
+%% rabbit_types:ctag(), non_neg_integer()) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -50,7 +59,10 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
-
+%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(),
+%% non_neg_integer(),
+%% non_neg_integer(), boolean()) -> 'ok').
+%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()).
-endif.
%%----------------------------------------------------------------------------
@@ -60,9 +72,8 @@
blocked = false,
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
-%% 'Notify' is a boolean that indicates whether a queue should be
-%% notified of a change in the limit or volume that may allow it to
-%% deliver more messages via the limiter's channel.
+
+-record(credit, {count = 0, credit = 0, drain = false}).
%%----------------------------------------------------------------------------
%% API
@@ -71,7 +82,8 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid, enabled = false}.
+make_token(Pid) -> #token{pid = Pid, enabled = false,
+ q_state = dict:new()}.
is_enabled(#token{enabled = Enabled}) -> Enabled.
@@ -88,15 +100,19 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
+can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
fun () ->
gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
end);
-can_send(_, _, _) ->
+can_ch_send(_, _, _) ->
true.
+can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) ->
+ {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState),
+ {CanQ, Token#token{q_state = NewQState}}.
+
%% Let the limiter know that the channel has received some acks from a
%% consumer
ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
@@ -119,6 +135,78 @@ unblock(Limiter) ->
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
+inform(Limiter = #token{q_state = Credits},
+ ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) ->
+ {Unblock, Credits2} =
+ update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits),
+ {Unblock, Limiter#token{q_state = Credits2}}.
+
+%%----------------------------------------------------------------------------
+%% Queue-local code
+%%----------------------------------------------------------------------------
+
+%% We want to do all the AMQP 1.0-ish link level credit calculations in the
+%% queue (to do them elsewhere introduces a ton of races). However, it's a big
+%% chunk of code that is conceptually very linked to the limiter concept. So
+%% we get the queue to hold a bit of state for us (#token.q_state), and
+%% maintain a fiction that the limiter is making the decisions...
+
+can_send_q(CTag, Len, ChPid, Credits) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{credit = C} = Cred} ->
+ if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits),
+ {true, Credits2};
+ true -> {false, Credits}
+ end;
+ _ ->
+ {true, Credits}
+ end.
+
+decr_credit(CTag, Len, ChPid, Cred, Credits) ->
+ #credit{credit = Credit, count = Count, drain = Drain} = Cred,
+ {NewCredit, NewCount} =
+ case {Len, Drain} of
+ {1, true} -> %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _} -> {Credit - 1, serial_add(Count, 1)}
+ end,
+ write_credit(CTag, NewCredit, NewCount, Drain, Credits).
+
+send_drained(ChPid, CTag, Count) ->
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_state'{consumer_tag = CTag,
+ credit = 0,
+ count = Count,
+ available = 0,
+ drain = true}).
+
+%% Update the credit state.
+%% TODO Edge case: if the queue has nothing in it, and drain is set,
+%% we want to send a basic.credit back.
+update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
+ Count =
+ case dict:find(CTag, Credits) of
+ %% Use our count if we can, more accurate
+ {ok, #credit{ count = LocalCount }} -> LocalCount;
+ %% But if this is new, take it from the adapter
+ _ -> Count0
+ end,
+ rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}),
+ NewCredits = write_credit(CTag, Credit, Count, Drain, Credits),
+ case Credit > 0 of
+ true -> {[CTag], NewCredits};
+ false -> {[], NewCredits}
+ end.
+
+%% TODO currently we leak when a single session creates and destroys
+%% lot of links.
+write_credit(CTag, Credit, Count, Drain, Credits) ->
+ dict:store(CTag, #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ce3e380248..c316db99b0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -68,6 +68,7 @@
-export([base64url/1]).
-export([interval_operation/4]).
-export([get_parent/0]).
+-export([serial_add/2, serial_compare/2, serial_diff/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -82,6 +83,7 @@
-ifdef(use_specs).
-export_type([resource_name/0, thunk/1]).
+-export_type([serial_number/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +96,8 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(serial_number() :: non_neg_integer()).
+-type(serial_compare_result() :: 'equal' | 'less' | 'greater').
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -243,6 +247,12 @@
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-spec(get_parent/0 :: () -> pid()).
+-spec(serial_add/2 :: (serial_number(), non_neg_integer()) ->
+ serial_number()).
+-spec(serial_compare/2 :: (serial_number(), serial_number()) ->
+ serial_compare_result()).
+-spec(serial_diff/2 :: (serial_number(), serial_number()) ->
+ integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -1080,3 +1090,44 @@ whereis_name(Name) ->
%% End copypasta from gen_server2.erl
%% -------------------------------------------------------------------------
+
+%% Serial arithmetic for unsigned ints.
+%% http://www.faqs.org/rfcs/rfc1982.html
+%% SERIAL_BITS = 32
+
+%% 2 ^ SERIAL_BITS
+-define(SERIAL_MAX, 16#100000000).
+%% 2 ^ (SERIAL_BITS - 1) - 1
+-define(SERIAL_MAX_ADDEND, 16#7fffffff).
+
+serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND ->
+ (S + N) rem ?SERIAL_MAX;
+serial_add(S, N) ->
+ exit({out_of_bound_serial_addition, S, N}).
+
+serial_compare(A, B) ->
+ if A =:= B ->
+ equal;
+ (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso A - B > ?SERIAL_MAX_ADDEND) ->
+ less;
+ (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso B - A < ?SERIAL_MAX_ADDEND) ->
+ greater;
+ true -> exit({indeterminate_serial_comparison, A, B})
+ end.
+
+-define(SERIAL_DIFF_BOUND, 16#80000000).
+
+serial_diff(A, B) ->
+ Diff = A - B,
+ if Diff > (?SERIAL_DIFF_BOUND) ->
+ %% B is actually greater than A
+ - (?SERIAL_MAX - Diff);
+ Diff < - (?SERIAL_DIFF_BOUND) ->
+ ?SERIAL_MAX + Diff;
+ Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND ->
+ Diff;
+ true ->
+ exit({indeterminate_serial_diff, A, B})
+ end.
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 13e8feff08..f140bd237c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -689,6 +689,15 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) ->
handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) ->
start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State);
+%% ... and finally, the 1.0 spec is crystal clear! Note that the
+%% FIXME TLS uses a different protocol number, and would go here.
+handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) ->
+ become_1_0(amqp, [0, 1, 0, 0], State);
+
+%% 3 stands for "SASL"
+handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) ->
+ become_1_0(sasl, [0, 3, 0, 0], State);
+
handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_version, A, B, C, D});
@@ -981,3 +990,28 @@ cert_info(F, #v1{sock = Sock}) ->
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
+
+%% 1.0 stub
+
+become_1_0(Mode, HandshakeBytes, State = #v1{sock = Sock}) ->
+ case code:is_loaded(rabbit_amqp1_0_reader) of
+ false -> refuse_connection(
+ Sock, list_to_tuple([bad_version | HandshakeBytes]));
+ _ -> apply0(rabbit_amqp1_0_reader, become,
+ [Mode, pack_for_1_0(State)])
+ end.
+
+%% Fool xref. Simply using apply(M, F, A) with constants is not enough.
+apply0(M, F, A) -> apply(M, F, A).
+
+pack_for_1_0(#v1{parent = Parent,
+ sock = Sock,
+ recv_len = RecvLen,
+ pending_recv = PendingRecv,
+ queue_collector = QueueCollector,
+ channel_sup_sup_pid = ChannelSupSupPid,
+ start_heartbeat_fun = SHF,
+ buf = Buf,
+ buf_len = BufLen}) ->
+ {Parent, Sock, RecvLen, PendingRecv, QueueCollector,
+ ChannelSupSupPid, SHF, Buf, BufLen}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 09ed3d0890..9ca7763d09 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -50,6 +50,7 @@ all_tests() ->
passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
+ passed = test_serial_arithmetic(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -558,6 +559,29 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
+test_serial_arithmetic() ->
+ 1 = rabbit_misc:serial_add(0, 1),
+ 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff),
+ 0 = rabbit_misc:serial_add(16#ffffffff, 1),
+ %% Cannot add more than 2 ^ 31 - 1
+ case catch rabbit_misc:serial_add(200, 16#80000000) of
+ {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok;
+ _ -> exit(fail_out_of_bound_serial_addition)
+ end,
+
+ 1 = rabbit_misc:serial_diff(1, 0),
+ 2 = rabbit_misc:serial_diff(1, 16#ffffffff),
+ -2 = rabbit_misc:serial_diff(16#ffffffff, 1),
+ case catch rabbit_misc:serial_diff(0, 16#80000000) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ passed.
+
test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,