diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 11:44:23 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-12 11:44:23 +0000 |
| commit | e2333d618ee4afce1b2a7a8fca869af8b5c4d928 (patch) | |
| tree | c7a721035555f6d71f7613aa7445e6d04bdf1edd /src | |
| parent | 0b2f9fe9775bb04bfdde3d6cabf0cfcc5df1fd9d (diff) | |
| parent | bde44f20e8bb44411eec6a1dcdfb20f4fb290aa4 (diff) | |
| download | rabbitmq-server-git-e2333d618ee4afce1b2a7a8fca869af8b5c4d928.tar.gz | |
Merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 118 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 24 |
6 files changed, 312 insertions, 53 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index ae7fe5c5e1..4b1ba53832 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,9 +26,9 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0, wake_up/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/3, basic_consume/7, basic_cancel/4]). +-export([basic_get/3, basic_consume/8, basic_cancel/4]). -export([notify_sent/2, notify_sent_queue_down/1, unblock/2, flush_all/2]). --export([notify_down_all/2, limit_all/3]). +-export([notify_down_all/2, limit_all/3, credit/5]). -export([on_node_down/1]). -export([update/2, store_queue/1, policy_changed/2]). -export([start_mirroring/1, stop_mirroring/1, sync_mirrors/1, @@ -146,11 +146,14 @@ -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). -spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> ok_or_errors()). +-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean()) -> 'ok'). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). --spec(basic_consume/7 :: +-spec(basic_consume/8 :: (rabbit_types:amqqueue(), boolean(), pid(), - rabbit_limiter:token(), rabbit_types:ctag(), boolean(), any()) + rabbit_limiter:token(), rabbit_types:ctag(), boolean(), + {non_neg_integer(), boolean()} | 'none', any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -533,13 +536,16 @@ notify_down_all(QPids, ChPid) -> limit_all(QPids, ChPid, Limiter) -> delegate:cast(QPids, {limit, ChPid, Limiter}). +credit(#amqqueue{pid = QPid}, ChPid, CTag, Credit, Drain) -> + delegate:cast(QPid, {credit, ChPid, CTag, Credit, Drain}). + basic_get(#amqqueue{pid = QPid}, ChPid, NoAck) -> delegate:call(QPid, {basic_get, ChPid, NoAck}). basic_consume(#amqqueue{pid = QPid}, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg) -> - delegate:call(QPid, {basic_consume, NoAck, ChPid, - Limiter, ConsumerTag, ExclusiveConsume, OkMsg}). + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg) -> + delegate:call(QPid, {basic_consume, NoAck, ChPid, Limiter, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe3a609996..88d1329057 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -65,9 +65,18 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% List of consumer tags which have individually been + %% blocked by the limiter. + blocked_ctags, + %% The limiter itself limiter, + %% Has the limiter imposed a channel-wide block, either + %% because of qos or channel flow? is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -366,6 +375,7 @@ ch_record(ChPid) -> acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), + blocked_ctags = [], is_limit_active = false, limiter = rabbit_limiter:make_token(), unsent_message_count = 0}, @@ -413,13 +423,6 @@ block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok - end. - deliver_msgs_to_consumers(_DeliverFun, true, State) -> {true, State}; deliver_msgs_to_consumers(DeliverFun, false, @@ -434,23 +437,35 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> C = ch_record(ChPid), case is_ch_blocked(C) of - true -> block_consumer(C, E), - {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) - end + true -> + block_consumer(C, E), + {false, State}; + false -> + #cr{limiter = Limiter, ch_pid = ChPid, blocked_ctags = BCTags} = C, + #consumer{tag = CTag} = Consumer, + case rabbit_limiter:can_send( + Limiter, self(), Consumer#consumer.ack_required, + ChPid, CTag, BQ:len(BQS)) of + consumer_blocked -> + block_consumer(C#cr{blocked_ctags = [CTag | BCTags]}, E), + {false, State}; + channel_blocked -> + block_consumer(C#cr{is_limit_active = true}, E), + {false, State}; + Limiter2 -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Limiter2, Consumer, C, + State#q{active_consumers = AC1}) + end end. -deliver_msg_to_consumer(DeliverFun, +deliver_msg_to_consumer(DeliverFun, NewLimiter, #consumer{tag = ConsumerTag, ack_required = AckRequired}, C = #cr{ch_pid = ChPid, @@ -466,6 +481,7 @@ deliver_msg_to_consumer(DeliverFun, false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, + limiter = NewLimiter, unsent_message_count = Count + 1}), {Stop, State1}. @@ -605,16 +621,21 @@ possibly_unblock(State, ChPid, Update) -> not_found -> State; C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) + C1 = #cr{blocked_ctags = BCTags} = Update(C), + IsBlocked = is_ch_blocked(C1), + {Blocked, Unblocked} = + lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + IsBlocked orelse lists:member(CTag, BCTags) + end, queue:to_list(C1#cr.blocked_consumers)), + case Unblocked of + [] -> update_ch_record(C1), + State; + _ -> update_ch_record( + C1#cr{blocked_consumers = queue:from_list(Blocked)}), + AC1 = queue:join(State#q.active_consumers, + queue:from_list(Unblocked)), + run_message_queue(State#q{active_consumers = AC1}) end end. @@ -1076,14 +1097,24 @@ handle_call({basic_get, ChPid, NoAck}, _From, end; handle_call({basic_consume, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg}, - _From, State = #q{exclusive_consumer = Holder}) -> + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, + _From, State = #q{exclusive_consumer = Holder, + backing_queue = BQ, + backing_queue_state = BQS}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + Limiter2 = case CreditArgs of + none -> + Limiter; + {Credit, Drain} -> + rabbit_limiter:initial_credit( + Limiter, ChPid, ConsumerTag, Credit, Drain, + BQ:len(BQS)) + end, + C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1112,10 +1143,12 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{limiter = Limiter, blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), + Limiter1 = rabbit_limiter:forget_consumer(Limiter, ConsumerTag), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + update_consumer_count(C#cr{limiter = Limiter1, + blocked_consumers = Blocked1}, -1), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1269,7 +1302,9 @@ handle_cast({limit, ChPid, Limiter}, State) -> false -> ok end, Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} + C#cr{limiter = rabbit_limiter:copy_queue_state( + OldLimiter, Limiter), + is_limit_active = Limited} end)); handle_cast({flush, ChPid}, State) -> @@ -1302,6 +1337,17 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + #cr{limiter = Lim, + blocked_ctags = BCTags} = ch_record(ChPid), + {Unblock, Lim2} = rabbit_limiter:credit( + Lim, ChPid, CTag, Credit, Drain, BQ:len(BQS)), + noreply(possibly_unblock( + State, ChPid, fun(C) -> C#cr{blocked_ctags = BCTags -- Unblock, + limiter = Lim2} end)); + handle_cast(wake_up, State) -> noreply(State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e74211af4b..c1eb126ca4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -21,7 +21,8 @@ -behaviour(gen_server2). -export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/3, + flushed/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). -export([force_event_refresh/0]). @@ -94,6 +95,9 @@ -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). +-spec(send_credit_reply/2 :: (pid(), non_neg_integer()) -> 'ok'). +-spec(send_drained/3 :: (pid(), rabbit_types:ctag(), non_neg_integer()) + -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). @@ -138,6 +142,12 @@ send_command(Pid, Msg) -> deliver(Pid, ConsumerTag, AckRequired, Msg) -> gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}). +send_credit_reply(Pid, Len) -> + gen_server2:cast(Pid, {send_credit_reply, Len}). + +send_drained(Pid, ConsumerTag, CreditDrained) -> + gen_server2:cast(Pid, {send_drained, ConsumerTag, CreditDrained}). + flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). @@ -315,6 +325,18 @@ handle_cast({deliver, ConsumerTag, AckRequired, Content), noreply(record_sent(ConsumerTag, AckRequired, Msg, State)); +handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_ok'{available = Len}), + noreply(State); + +handle_cast({send_drained, ConsumerTag, CreditDrained}, + State = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.credit_drained'{consumer_tag = ConsumerTag, + credit_drained = CreditDrained}), + noreply(State); + handle_cast(force_event_refresh, State) -> rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), noreply(State); @@ -697,7 +719,8 @@ handle_method(#'basic.consume'{queue = QueueNameBin, no_local = _, % FIXME: implement no_ack = NoAck, exclusive = ExclusiveConsume, - nowait = NoWait}, + nowait = NoWait, + arguments = Arguments}, _, State = #ch{conn_pid = ConnPid, limiter = Limiter, consumer_mapping = ConsumerMapping}) -> @@ -721,6 +744,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, {rabbit_amqqueue:basic_consume( Q, NoAck, self(), Limiter, ActualConsumerTag, ExclusiveConsume, + parse_credit_args(Arguments), ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), Q} @@ -1089,6 +1113,17 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})}; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + drain = Drain}, _, + State = #ch{consumer_mapping = Consumers}) -> + case dict:find(CTag, Consumers) of + {ok, Q} -> ok = rabbit_amqqueue:credit( + Q, self(), CTag, Credit, Drain), + {noreply, State}; + error -> precondition_failed("unknown consumer tag '~s'", [CTag]) + end; + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). @@ -1155,6 +1190,16 @@ handle_consuming_queue_down(QPid, handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {boolean, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + binding_action(Fun, ExchangeNameBin, DestinationType, DestinationNameBin, RoutingKey, Arguments, ReturnMethod, NoWait, State = #ch{virtual_host = VHostPath, diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a7d14fe0b..46b465bc12 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -22,12 +22,15 @@ handle_info/2, prioritise_call/3]). -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/6, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([initial_credit/6, credit/6, forget_consumer/2, copy_queue_state/2]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(token, {pid, enabled, q_state}). -ifdef(use_specs). @@ -42,7 +45,9 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_send/6 :: (token(), pid(), boolean(), pid(), rabbit_types:ctag(), + non_neg_integer()) + -> token() | 'consumer_blocked' | 'channel_blocked'). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -50,6 +55,14 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). +-spec(initial_credit/6 :: (token(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), non_neg_integer()) + -> token()). +-spec(credit/6 :: (token(), pid(), rabbit_types:ctag(), + non_neg_integer(), boolean(), non_neg_integer()) + -> {[rabbit_types:ctag()], token()}). +-spec(forget_consumer/2 :: (token(), rabbit_types:ctag()) -> token()). +-spec(copy_queue_state/2 :: (token(), token()) -> token()). -endif. @@ -64,6 +77,8 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. +-record(credit, {credit = 0, drain = false}). + %%---------------------------------------------------------------------------- %% API %%---------------------------------------------------------------------------- @@ -71,7 +86,8 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +make_token(Pid) -> #token{pid = Pid, enabled = false, + q_state = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -88,14 +104,29 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_send(Token = #token{pid = Pid, enabled = Enabled, q_state = Credits}, + QPid, AckReq, ChPid, CTag, Len) -> + ConsAllows = case dict:find(CTag, Credits) of + {ok, #credit{credit = C}} when C > 0 -> true; + {ok, #credit{}} -> false; + error -> true + end, + case ConsAllows of + true -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of + true -> Credits2 = record_send_q( + CTag, Len, ChPid, Credits), + Token#token{q_state = Credits2}; + false -> channel_blocked + end; + false -> consumer_blocked + end. + +call_can_send(Pid, QPid, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_send(_, _, _) -> - true. + end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -119,6 +150,62 @@ unblock(Limiter) -> is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). +initial_credit(Limiter = #token{q_state = Credits}, + ChPid, CTag, Credit, Drain, Len) -> + {[], Credits2} = update_credit( + CTag, Len, ChPid, Credit, Drain, Credits), + Limiter#token{q_state = Credits2}. + +credit(Limiter = #token{q_state = Credits}, + ChPid, CTag, Credit, Drain, Len) -> + {Unblock, Credits2} = update_credit( + CTag, Len, ChPid, Credit, Drain, Credits), + rabbit_channel:send_credit_reply(ChPid, Len), + {Unblock, Limiter#token{q_state = Credits2}}. + +forget_consumer(Limiter = #token{q_state = Credits}, CTag) -> + Limiter#token{q_state = dict:erase(CTag, Credits)}. + +copy_queue_state(#token{q_state = Credits}, Token) -> + Token#token{q_state = Credits}. + +%%---------------------------------------------------------------------------- +%% Queue-local code +%%---------------------------------------------------------------------------- + +%% We want to do all the AMQP 1.0-ish link level credit calculations in the +%% queue (to do them elsewhere introduces a ton of races). However, it's a big +%% chunk of code that is conceptually very linked to the limiter concept. So +%% we get the queue to hold a bit of state for us (#token.q_state), and +%% maintain a fiction that the limiter is making the decisions... + +record_send_q(CTag, Len, ChPid, Credits) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = Credit, drain = Drain}} -> + NewCredit = maybe_drain(Len - 1, Drain, CTag, ChPid, Credit - 1), + write_credit(CTag, NewCredit, Drain, Credits); + error -> + Credits + end. + +update_credit(CTag, Len, ChPid, Credit, Drain, Credits) -> + NewCredit = maybe_drain(Len, Drain, CTag, ChPid, Credit), + NewCredits = write_credit(CTag, NewCredit, Drain, Credits), + case NewCredit > 0 of + true -> {[CTag], NewCredits}; + false -> {[], NewCredits} + end. + +write_credit(CTag, Credit, Drain, Credits) -> + dict:store(CTag, #credit{credit = Credit, drain = Drain}, Credits). + +maybe_drain(0, true, CTag, ChPid, Credit) -> + rabbit_channel:send_drained(ChPid, CTag, Credit), + 0; %% Magic reduction to 0 + +maybe_drain(_, _, _, _, Credit) -> + Credit. + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index c36fb147c8..135f644343 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -69,6 +69,7 @@ -export([interval_operation/4]). -export([ensure_timer/4, stop_timer/2]). -export([get_parent/0]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -83,6 +84,7 @@ -ifdef(use_specs). -export_type([resource_name/0, thunk/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -95,6 +97,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -246,6 +250,12 @@ -spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A). -spec(stop_timer/2 :: (A, non_neg_integer()) -> A). -spec(get_parent/0 :: () -> pid()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. %%---------------------------------------------------------------------------- @@ -1099,3 +1109,44 @@ whereis_name(Name) -> %% End copypasta from gen_server2.erl %% ------------------------------------------------------------------------- + +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index f5ea4fba7e..e2af7efdd3 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -50,6 +50,7 @@ all_tests() -> passed = test_table_codec(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -559,6 +560,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, |
