summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2011-02-11 13:54:29 +0000
committerMichael Bridgen <mikeb@rabbitmq.com>2011-02-11 13:54:29 +0000
commit6398ee3c7c0908e8364b3af9e46860426580780e (patch)
tree188a9ef230efe467189658fc71b50cca53f9e440 /src
parentfee276a98263d5dd0f458309fc2d16bed46cca44 (diff)
downloadrabbitmq-server-git-6398ee3c7c0908e8364b3af9e46860426580780e.tar.gz
Use serial number arithmetic for credit
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_limiter.erl15
-rw-r--r--src/rabbit_misc.erl51
-rw-r--r--src/rabbit_tests.erl26
3 files changed, 85 insertions, 7 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index a9ecf2e008..50cd2aaab2 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -25,6 +25,8 @@
-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, set_credit/5, is_blocked/1]).
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -224,12 +226,13 @@ decr_credit(CTag, Len, State = #lim{ credits = Credits,
{ok, #credit{ credit = Credit, count = Count, drain = Drain }} ->
{NewCredit, NewCount} =
case {Credit, Len, Drain} of
- {1, _, _} -> {0, Count + 1}; %% Usual reduction to 0
+ {1, _, _} -> {0, serial_add(Count, 1)};
{_, 1, true} ->
- NewCount0 = Count + (Credit - 1),
+ %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
send_drained(ChPid, CTag, NewCount0),
{0, NewCount0}; %% Magic reduction to 0
- {_, _, _} -> {Credit - 1, Count + 1}
+ {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
end,
update_credit(CTag, NewCredit, NewCount, Drain, State);
error ->
@@ -255,9 +258,9 @@ reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) ->
LocalCount;
_ -> Count0
end,
- %% Our credit may have been reduced while messages are
- %% in flight, so we bottom out at 0.
- Credit = erlang:max(0, Count0 + Credit0 - Count),
+ %% Our credit may have been reduced while messages are in flight,
+ %% so we bottom out at 0.
+ Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
update_credit(CTag, Credit, Count, Drain, State).
%% Store the credit
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3a4fb024fe..aef8c22a89 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -56,12 +56,14 @@
-export([lock_file/1]).
-export([const_ok/1, const/1]).
-export([ntoa/1, ntoab/1]).
+-export([serial_add/2, serial_compare/2, serial_diff/2]).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
-export_type([resource_name/0, thunk/1, const/1]).
+-export_type([serial_number/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -75,6 +77,8 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(serial_number() :: non_neg_integer()).
+-type(serial_compare_result() :: 'equal' | 'less' | 'greater').
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -194,6 +198,12 @@
-spec(const/1 :: (A) -> const(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
-spec(ntoab/1 :: (inet:ip_address()) -> string()).
+-spec(serial_add/2 :: (serial_number(), non_neg_integer()) ->
+ serial_number()).
+-spec(serial_compare/2 :: (serial_number(), serial_number()) ->
+ serial_compare_result()).
+-spec(serial_diff/2 :: (serial_number(), serial_number()) ->
+ integer()).
-endif.
@@ -849,3 +859,44 @@ ntoab(IP) ->
0 -> Str;
_ -> "[" ++ Str ++ "]"
end.
+
+%% Serial arithmetic for unsigned ints.
+%% http://www.faqs.org/rfcs/rfc1982.html
+%% SERIAL_BITS = 32
+
+%% 2 ^ SERIAL_BITS
+-define(SERIAL_MAX, 16#100000000).
+%% 2 ^ (SERIAL_BITS - 1) - 1
+-define(SERIAL_MAX_ADDEND, 16#7fffffff).
+
+serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND ->
+ (S + N) rem ?SERIAL_MAX;
+serial_add(S, N) ->
+ exit({out_of_bound_serial_addition, S, N}).
+
+serial_compare(A, B) ->
+ if A =:= B ->
+ equal;
+ (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso A - B > ?SERIAL_MAX_ADDEND) ->
+ less;
+ (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso B - A < ?SERIAL_MAX_ADDEND) ->
+ greater;
+ true -> exit({indeterminate_serial_comparison, A, B})
+ end.
+
+-define(SERIAL_DIFF_BOUND, 16#80000000).
+
+serial_diff(A, B) ->
+ Diff = A - B,
+ if Diff > (?SERIAL_DIFF_BOUND) ->
+ %% B is actually greater than A
+ - (?SERIAL_MAX - Diff);
+ Diff < - (?SERIAL_DIFF_BOUND) ->
+ ?SERIAL_MAX + Diff;
+ Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND ->
+ Diff;
+ true ->
+ exit({indeterminate_serial_diff, A, B})
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 49b0950832..925649b864 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -18,7 +18,7 @@
-compile([export_all]).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0, test_parsing/0, test_serial_arithmetic/0]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -46,6 +46,7 @@ all_tests() ->
passed = test_parsing(),
passed = test_content_framing(),
passed = test_content_transcoding(),
+ passed = test_serial_arithmetic(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -580,6 +581,29 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
+test_serial_arithmetic() ->
+ 1 = rabbit_misc:serial_add(0, 1),
+ 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff),
+ 0 = rabbit_misc:serial_add(16#ffffffff, 1),
+ %% Cannot add more than 2 ^ 31 - 1
+ case catch rabbit_misc:serial_add(200, 16#80000000) of
+ {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok;
+ _ -> exit(fail_out_of_bound_serial_addition)
+ end,
+
+ 1 = rabbit_misc:serial_diff(1, 0),
+ 2 = rabbit_misc:serial_diff(1, 16#ffffffff),
+ -2 = rabbit_misc:serial_diff(16#ffffffff, 1),
+ case catch rabbit_misc:serial_diff(0, 16#80000000) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ passed.
+
test_topic_match(P, R) ->
test_topic_match(P, R, true).