diff options
| author | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-11 13:54:29 +0000 |
|---|---|---|
| committer | Michael Bridgen <mikeb@rabbitmq.com> | 2011-02-11 13:54:29 +0000 |
| commit | 6398ee3c7c0908e8364b3af9e46860426580780e (patch) | |
| tree | 188a9ef230efe467189658fc71b50cca53f9e440 | |
| parent | fee276a98263d5dd0f458309fc2d16bed46cca44 (diff) | |
| download | rabbitmq-server-git-6398ee3c7c0908e8364b3af9e46860426580780e.tar.gz | |
Use serial number arithmetic for credit
| -rw-r--r-- | src/rabbit_limiter.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 26 |
3 files changed, 85 insertions, 7 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index a9ecf2e008..50cd2aaab2 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -25,6 +25,8 @@ -export([limit/2, can_send/5, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]). +-import(rabbit_misc, [serial_add/2, serial_diff/2]). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -224,12 +226,13 @@ decr_credit(CTag, Len, State = #lim{ credits = Credits, {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> {NewCredit, NewCount} = case {Credit, Len, Drain} of - {1, _, _} -> {0, Count + 1}; %% Usual reduction to 0 + {1, _, _} -> {0, serial_add(Count, 1)}; {_, 1, true} -> - NewCount0 = Count + (Credit - 1), + %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), send_drained(ChPid, CTag, NewCount0), {0, NewCount0}; %% Magic reduction to 0 - {_, _, _} -> {Credit - 1, Count + 1} + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} end, update_credit(CTag, NewCredit, NewCount, Drain, State); error -> @@ -255,9 +258,9 @@ reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> LocalCount; _ -> Count0 end, - %% Our credit may have been reduced while messages are - %% in flight, so we bottom out at 0. - Credit = erlang:max(0, Count0 + Credit0 - Count), + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), update_credit(CTag, Credit, Count, Drain, State). %% Store the credit diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 3a4fb024fe..aef8c22a89 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -56,12 +56,14 @@ -export([lock_file/1]). -export([const_ok/1, const/1]). -export([ntoa/1, ntoab/1]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). %%---------------------------------------------------------------------------- -ifdef(use_specs). -export_type([resource_name/0, thunk/1, const/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -75,6 +77,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -194,6 +198,12 @@ -spec(const/1 :: (A) -> const(A)). -spec(ntoa/1 :: (inet:ip_address()) -> string()). -spec(ntoab/1 :: (inet:ip_address()) -> string()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. @@ -849,3 +859,44 @@ ntoab(IP) -> 0 -> Str; _ -> "[" ++ Str ++ "]" end. + +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 49b0950832..925649b864 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -18,7 +18,7 @@ -compile([export_all]). --export([all_tests/0, test_parsing/0]). +-export([all_tests/0, test_parsing/0, test_serial_arithmetic/0]). -include("rabbit.hrl"). -include("rabbit_framing.hrl"). @@ -46,6 +46,7 @@ all_tests() -> passed = test_parsing(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -580,6 +581,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_match(P, R) -> test_topic_match(P, R, true). |
