diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 110 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_connection_sup.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 24 |
8 files changed, 308 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2477b891fc..1715e848b1 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,6 +33,7 @@ -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, cancel_sync_mirrors/1]). +-export([inform_limiter/3]). %% internal -export([internal_declare/2, internal_delete/1, run_backing_queue/3, @@ -177,6 +178,7 @@ -spec(sync_mirrors/1 :: (pid()) -> 'ok' | rabbit_types:error('pending_acks' | 'not_mirrored')). -spec(cancel_sync_mirrors/1 :: (pid()) -> 'ok' | {'ok', 'not_syncing'}). +-spec(inform_limiter/3 :: (pid(), pid(), any()) -> 'ok'). -endif. @@ -607,6 +609,9 @@ stop_mirroring(QPid) -> ok = delegate:cast(QPid, stop_mirroring). sync_mirrors(QPid) -> delegate:call(QPid, sync_mirrors). cancel_sync_mirrors(QPid) -> delegate:call(QPid, cancel_sync_mirrors). +inform_limiter(ChPid, QPid, Msg) -> + delegate:cast(QPid, {inform_limiter, ChPid, Msg}). + on_node_down(Node) -> rabbit_misc:execute_mnesia_tx_with_tail( fun () -> QsDels = diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e76bf6ea47..1329209175 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -65,9 +65,18 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% List of consumer tags which have individually been + %% blocked by the limiter. + blocked_ctags, + %% The limiter itself limiter, + %% Has the limiter imposed a channel-wide block, either + %% because of qos or channel flow? is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -358,6 +367,7 @@ ch_record(ChPid) -> acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), + blocked_ctags = [], is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -405,13 +415,6 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok - end. - deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, @@ -426,20 +429,38 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, + E = {ChPid, + Consumer = #consumer{tag = CTag, + ack_required = AckReq}}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> C = ch_record(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), - {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) - end + true -> + block_consumer(C, E), + {false, State}; + false -> + #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, + case rabbit_limiter:can_cons_send( + Limiter, ChPid, CTag, BQ:len(BQS)) of + {false, Lim2} -> + %% TODO unify with first case? + block_consumer(C#cr{limiter = Lim2, + blocked_ctags = [CTag | BCTags]}, E), + {false, State}; + {true, Lim2} -> + case rabbit_limiter:can_ch_send(Limiter, self(), AckReq) of + false -> + block_consumer(C#cr{is_limit_active = true}, E), + {false, State}; + true -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Lim2}, + State#q{active_consumers = AC1}) + end + end end. deliver_msg_to_consumer(DeliverFun, @@ -597,16 +618,20 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) + C1 = #cr{blocked_ctags = BCTags1} = Update(C), + {Blocked, Unblocked} = + lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + is_ch_blocked(C1) orelse lists:member(CTag, BCTags1) + end, queue:to_list(C1#cr.blocked_consumers)), + case Unblocked of + [] -> update_ch_record(C1), + State; + _ -> update_ch_record( + C1#cr{blocked_consumers = queue:from_list(Blocked)}), + AC1 = queue:join(State#q.active_consumers, + queue:from_list(Unblocked)), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -658,8 +683,6 @@ check_exclusive_access(none, true, State) -> consumer_count() -> consumer_count(fun (_) -> false end). -active_consumer_count() -> consumer_count(fun is_ch_blocked/1). - consumer_count(Exclude) -> lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(), not Exclude(C)]). @@ -932,8 +955,8 @@ i(messages, State) -> messages_unacknowledged]]); i(consumers, _) -> consumer_count(); -i(active_consumers, _) -> - active_consumer_count(); +i(active_consumers, #q{active_consumers = ActiveConsumers}) -> + queue:len(ActiveConsumers); i(memory, _) -> {memory, M} = process_info(self(), memory), M; @@ -1149,9 +1172,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, end; handle_call(stat, _From, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = + State1 = #q{active_consumers = AC, + backing_queue = BQ, backing_queue_state = BQS} = drop_expired_msgs(ensure_expiry_timer(State)), - reply({ok, BQ:len(BQS), active_consumer_count()}, State1); + reply({ok, BQ:len(BQS), queue:len(AC)}, State1); handle_call({delete, IfUnused, IfEmpty}, From, State = #q{backing_queue_state = BQS, backing_queue = BQ}) -> @@ -1291,7 +1315,9 @@ handle_cast({limit, ChPid, Limiter}, State) -> false -> ok end, Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} + C#cr{limiter = rabbit_limiter:copy_queue_state( + OldLimiter, Limiter), + is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> @@ -1324,6 +1350,18 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({inform_limiter, ChPid, Msg}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + #cr{limiter = Limiter, + blocked_ctags = BCTags} = ch_record(ChPid), + {Unblock, Limiter2} = + rabbit_limiter:inform(Limiter, ChPid, BQ:len(BQS), Msg), + noreply(possibly_unblock( + State, ChPid, + fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, + limiter = Limiter2} end)); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e3dfc583..c3a5b16df9 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1079,6 +1079,20 @@ handle_method(#'channel.flow'{active = false}, _, {noreply, State2} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + count = Count, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:inform_limiter( + self(), Q#amqqueue.pid, + {basic_credit, CTag, Credit, Count, Drain}), + {noreply, State}; + error -> rabbit_misc:protocol_error( + not_allowed, "unknown consumer tag '~s'", [CTag]) + end; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 12a532b6fc..f4f3c72feb 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,10 +42,17 @@ start_link() -> SupPid, {collector, {rabbit_queue_collector, start_link, []}, intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}), + %% Note that rabbit_amqp1_0_session_sup_sup despite the name can + %% mimic rabbit_channel_sup_sup when we handle a 0-9-1 connection + %% and the 1.0 plugin is loaded. + ChannelSupSupModule = case code:is_loaded(rabbit_amqp1_0_session_sup_sup) of + false -> rabbit_channel_sup_sup; + _ -> rabbit_amqp1_0_session_sup_sup + end, {ok, ChannelSupSupPid} = supervisor2:start_child( SupPid, - {channel_sup_sup, {rabbit_channel_sup_sup, start_link, []}, + {channel_sup_sup, {ChannelSupSupModule, start_link, []}, intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}), {ok, ReaderPid} = supervisor2:start_child( diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2b15498ed9..9da1bc6f7c 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,19 +15,25 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). + -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_ch_send/3, can_cons_send/4, + ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([inform/4]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(token, {pid, enabled, q_state}). -ifdef(use_specs). @@ -42,7 +48,10 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_ch_send/3 :: (token(), pid(), boolean()) -> boolean()). +%% TODO +%% -spec(can_send/5 :: (token(), pid(), boolean(), +%% rabbit_types:ctag(), non_neg_integer()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -50,7 +59,10 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). - +%% -spec(set_credit/5 :: (token(), rabbit_types:ctag(), +%% non_neg_integer(), +%% non_neg_integer(), boolean()) -> 'ok'). +%%-spec(inform/4 :: (token(), pid(), non_neg_integer(), any()) -> token()). -endif. %%---------------------------------------------------------------------------- @@ -60,9 +72,8 @@ blocked = false, queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). -%% 'Notify' is a boolean that indicates whether a queue should be -%% notified of a change in the limit or volume that may allow it to -%% deliver more messages via the limiter's channel. + +-record(credit, {count = 0, credit = 0, drain = false}). %%---------------------------------------------------------------------------- %% API @@ -71,7 +82,8 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +make_token(Pid) -> #token{pid = Pid, enabled = false, + q_state = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -88,15 +100,19 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_ch_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) end); -can_send(_, _, _) -> +can_ch_send(_, _, _) -> true. +can_cons_send(#token{q_state = QState} = Token, ChPid, CTag, Len) -> + {CanQ, NewQState} = can_send_q(CTag, Len, ChPid, QState), + {CanQ, Token#token{q_state = NewQState}}. + %% Let the limiter know that the channel has received some acks from a %% consumer ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). @@ -119,6 +135,78 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +inform(Limiter = #token{q_state = Credits}, + ChPid, Len, {basic_credit, CTag, Credit, Count, Drain}) -> + {Unblock, Credits2} = + update_credit(CTag, Len, ChPid, Credit, Count, Drain, Credits), + {Unblock, Limiter#token{q_state = Credits2}}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations in the +%% queue (to do them elsewhere introduces a ton of races). However, it's a big +%% chunk of code that is conceptually very linked to the limiter concept. So +%% we get the queue to hold a bit of state for us (#token.q_state), and +%% maintain a fiction that the limiter is making the decisions... + +can_send_q(CTag, Len, ChPid, Credits) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = C} = Cred} -> + if C > 0 -> Credits2 = decr_credit(CTag, Len, ChPid, Cred, Credits), + {true, Credits2}; + true -> {false, Credits} + end; + _ -> + {true, Credits} + end. + +decr_credit(CTag, Len, ChPid, Cred, Credits) -> + #credit{credit = Credit, count = Count, drain = Drain} = Cred, + {NewCredit, NewCount} = + case {Len, Drain} of + {1, true} -> %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _} -> {Credit - 1, serial_add(Count, 1)} + end, + write_credit(CTag, NewCredit, NewCount, Drain, Credits). + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Update the credit state. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) -> + Count = + case dict:find(CTag, Credits) of + %% Use our count if we can, more accurate + {ok, #credit{ count = LocalCount }} -> LocalCount; + %% But if this is new, take it from the adapter + _ -> Count0 + end, + rabbit_channel:send_command(ChPid, #'basic.credit_ok'{available = Len}), + NewCredits = write_credit(CTag, Credit, Count, Drain, Credits), + case Credit > 0 of + true -> {[CTag], NewCredits}; + false -> {[], NewCredits} + end. + +%% TODO currently we leak when a single session creates and destroys +%% lot of links. +write_credit(CTag, Credit, Count, Drain, Credits) -> + dict:store(CTag, #credit{credit = Credit, + count = Count, + drain = Drain}, Credits). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ce3e380248..c316db99b0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -68,6 +68,7 @@ -export([base64url/1]). -export([interval_operation/4]). -export([get_parent/0]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -82,6 +83,7 @@ -ifdef(use_specs). -export_type([resource_name/0, thunk/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -94,6 +96,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -243,6 +247,12 @@ ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). -spec(get_parent/0 :: () -> pid()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. %%---------------------------------------------------------------------------- @@ -1080,3 +1090,44 @@ whereis_name(Name) -> %% End copypasta from gen_server2.erl %% ------------------------------------------------------------------------- + +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 13e8feff08..f140bd237c 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -689,6 +689,15 @@ handle_input(handshake, <<"AMQP", 1, 1, 8, 0>>, State) -> handle_input(handshake, <<"AMQP", 1, 1, 9, 1>>, State) -> start_connection({8, 0, 0}, rabbit_framing_amqp_0_8, State); +%% ... and finally, the 1.0 spec is crystal clear! Note that the +%% FIXME TLS uses a different protocol number, and would go here. +handle_input(handshake, <<"AMQP", 0, 1, 0, 0>>, State) -> + become_1_0(amqp, [0, 1, 0, 0], State); + +%% 3 stands for "SASL" +handle_input(handshake, <<"AMQP", 3, 1, 0, 0>>, State) -> + become_1_0(sasl, [0, 3, 0, 0], State); + handle_input(handshake, <<"AMQP", A, B, C, D>>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_version, A, B, C, D}); @@ -981,3 +990,28 @@ cert_info(F, #v1{sock = Sock}) -> emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). + +%% 1.0 stub + +become_1_0(Mode, HandshakeBytes, State = #v1{sock = Sock}) -> + case code:is_loaded(rabbit_amqp1_0_reader) of + false -> refuse_connection( + Sock, list_to_tuple([bad_version | HandshakeBytes])); + _ -> apply0(rabbit_amqp1_0_reader, become, + [Mode, pack_for_1_0(State)]) + end. + +%% Fool xref. Simply using apply(M, F, A) with constants is not enough. +apply0(M, F, A) -> apply(M, F, A). + +pack_for_1_0(#v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + queue_collector = QueueCollector, + channel_sup_sup_pid = ChannelSupSupPid, + start_heartbeat_fun = SHF, + buf = Buf, + buf_len = BufLen}) -> + {Parent, Sock, RecvLen, PendingRecv, QueueCollector, + ChannelSupSupPid, SHF, Buf, BufLen}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 09ed3d0890..9ca7763d09 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -50,6 +50,7 @@ all_tests() -> passed = test_table_codec(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -558,6 +559,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, |
