summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-07 18:07:56 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-07 18:07:56 +0000
commitafebcd8aaa467f0a4f8ade278d9b4c019bf96275 (patch)
tree628f953eb081a5e7fd6ef3035e69aff878f39984
parent1a663a1760ab67d00805ecb9394456586dfbde07 (diff)
parentd9f5cf966390120506e78265c18ac29dd9a50034 (diff)
downloadrabbitmq-server-git-afebcd8aaa467f0a4f8ade278d9b4c019bf96275.tar.gz
Merge in default.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_channel.erl31
-rw-r--r--src/rabbit_limiter.erl142
-rw-r--r--src/rabbit_misc.erl51
-rw-r--r--src/rabbit_tests.erl24
5 files changed, 228 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4341c3d6d8..08a1ca70b0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -423,13 +423,17 @@ deliver_msgs_to_consumers(DeliverFun, false,
deliver_msgs_to_consumers(DeliverFun, Stop, State1)
end.
-deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer},
+ State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
C = ch_record(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
- Consumer#consumer.ack_required) of
+ Consumer#consumer.ack_required,
+ Consumer#consumer.tag,
+ BQ:len(BQS)) of
false -> block_consumer(C#cr{is_limit_active = true}, E),
{false, State};
true -> AC1 = queue:in(E, State#q.active_consumers),
@@ -1245,7 +1249,8 @@ handle_cast({limit, ChPid, Limiter}, State) ->
true -> ok = rabbit_limiter:register(Limiter, self());
false -> ok
end,
- Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter),
+ Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter)
+ andalso rabbit_limiter:is_blocked(Limiter),
C#cr{limiter = Limiter, is_limit_active = Limited}
end));
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 88e3dfc583..6c2c37f9a8 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1079,6 +1079,37 @@ handle_method(#'channel.flow'{active = false}, _,
{noreply, State2}
end;
+handle_method(#'basic.credit'{consumer_tag = CTag,
+ credit = Credit,
+ count = Count,
+ drain = Drain}, _,
+ State = #ch{limiter = Limiter,
+ consumer_mapping = Consumers}) ->
+ %% We get Available first because it's likely that as soon as we set
+ %% the credit msgs will get consumed and it'll be out of date. Why do we
+ %% want that? Because at least then it's consistent with the credit value
+ %% we return. And Available is always going to be racy.
+ Available = case dict:find(CTag, Consumers) of
+ {ok, {Q, _}} -> case rabbit_amqqueue:stat(Q) of
+ {ok, Len, _} -> Len;
+ _ -> -1
+ end;
+ error -> -1 %% TODO these -1s smell very iffy!
+ end,
+ Limiter1 = case rabbit_limiter:is_enabled(Limiter) of
+ true -> Limiter;
+ false -> enable_limiter(State)
+ end,
+ Limiter3 =
+ case rabbit_limiter:set_credit(
+ Limiter1, CTag, Credit, Count, Drain) of
+ ok -> Limiter1;
+ {disabled, Limiter2} -> ok = limit_queues(Limiter2, State),
+ Limiter2
+ end,
+ State1 = State#ch{limiter = Limiter3},
+ return_ok(State1, false, #'basic.credit_ok'{available = Available});
+
handle_method(_MethodRecord, _Content, _State) ->
rabbit_misc:protocol_error(
command_invalid, "unimplemented method", []).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 2b15498ed9..5f1bc07ca0 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -15,15 +15,20 @@
%%
-module(rabbit_limiter).
+-include("rabbit_framing.hrl").
-behaviour(gen_server2).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/3]).
+
-export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2,
disable/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/5, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_blocked/1]).
+-export([set_credit/5]).
+
+-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
@@ -42,7 +47,8 @@
-spec(enable/2 :: (token(), non_neg_integer()) -> token()).
-spec(disable/1 :: (token()) -> token()).
-spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}).
--spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()).
+-spec(can_send/5 :: (token(), pid(), boolean(),
+ rabbit_types:ctag(), non_neg_integer()) -> boolean()).
-spec(ack/2 :: (token(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (token(), pid()) -> 'ok').
-spec(unregister/2 :: (token(), pid()) -> 'ok').
@@ -50,6 +56,9 @@
-spec(block/1 :: (token()) -> 'ok').
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
+-spec(set_credit/5 :: (token(), rabbit_types:ctag(),
+ non_neg_integer(),
+ non_neg_integer(), boolean()) -> 'ok').
-endif.
@@ -58,11 +67,11 @@
-record(lim, {prefetch_count = 0,
ch_pid,
blocked = false,
+ credits = dict:new(),
queues = orddict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
-%% 'Notify' is a boolean that indicates whether a queue should be
-%% notified of a change in the limit or volume that may allow it to
-%% deliver more messages via the limiter's channel.
+
+-record(credit, {count = 0, credit = 0, drain = false}).
%%----------------------------------------------------------------------------
%% API
@@ -88,18 +97,19 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) ->
+can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
fun () ->
- gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity)
+ gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len},
+ infinity)
end);
-can_send(_, _, _) ->
+can_send(_, _, _, _, _) ->
true.
%% Let the limiter know that the channel has received some acks from a
%% consumer
-ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}).
+ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}).
register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}).
@@ -116,6 +126,9 @@ block(Limiter) ->
unblock(Limiter) ->
maybe_call(Limiter, {unblock, Limiter}, ok).
+set_credit(Limiter, CTag, Credit, Count, Drain) ->
+ maybe_call(Limiter, {set_credit, CTag, Credit, Count, Drain, Limiter}, ok).
+
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
@@ -129,23 +142,26 @@ init([]) ->
prioritise_call(get_limit, _From, _State) -> 9;
prioritise_call(_Msg, _From, _State) -> 0.
-handle_call({can_send, QPid, _AckRequired}, _From,
+handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From,
State = #lim{blocked = true}) ->
{reply, false, limit_queue(QPid, State)};
-handle_call({can_send, QPid, AckRequired}, _From,
+handle_call({can_send, QPid, AckRequired, CTag, Len}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case limit_reached(CTag, State) of
true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ false -> {reply, true,
+ decr_credit(CTag, Len,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end})}
end;
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
handle_call({limit, PrefetchCount, Token}, _From, State) ->
- case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of
+ case maybe_notify(irrelevant,
+ State, State#lim{prefetch_count = PrefetchCount}) of
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
@@ -155,8 +171,17 @@ handle_call({limit, PrefetchCount, Token}, _From, State) ->
handle_call(block, _From, State) ->
{reply, ok, State#lim{blocked = true}};
+handle_call({set_credit, CTag, Credit, Count, Drain, Token}, _From, State) ->
+ case maybe_notify(CTag, State,
+ reset_credit(CTag, Credit, Count, Drain, State)) of
+ {cont, State1} ->
+ {reply, ok, State1};
+ {stop, State1} ->
+ {reply, {disabled, Token#token{enabled = false}}, State1}
+ end;
+
handle_call({unblock, Token}, _From, State) ->
- case maybe_notify(State, State#lim{blocked = false}) of
+ case maybe_notify(irrelevant, State, State#lim{blocked = false}) of
{cont, State1} ->
{reply, ok, State1};
{stop, State1} ->
@@ -172,11 +197,11 @@ handle_call({enable, Token, Channel, Volume}, _From, State) ->
handle_call({disable, Token}, _From, State) ->
{reply, Token#token{enabled = false}, State}.
-handle_cast({ack, Count}, State = #lim{volume = Volume}) ->
+handle_cast({ack, CTag}, State = #lim{volume = Volume}) ->
NewVolume = if Volume == 0 -> 0;
- true -> Volume - Count
+ true -> Volume - 1
end,
- {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}),
+ {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}),
{noreply, State1};
handle_cast({register, QPid}, State) ->
@@ -198,13 +223,14 @@ code_change(_, State, _) ->
%% Internal plumbing
%%----------------------------------------------------------------------------
-maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
+maybe_notify(CTag, OldState, NewState) ->
+ case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso
+ not (limit_reached(CTag, NewState) orelse blocked(NewState)) of
true -> NewState1 = notify_queues(NewState),
- {case NewState1#lim.prefetch_count of
- 0 -> stop;
- _ -> cont
+ {case {NewState1#lim.prefetch_count,
+ dict:size(NewState1#lim.credits)} of
+ {0, 0} -> stop;
+ _ -> cont
end, NewState1};
false -> {cont, NewState}
end.
@@ -219,8 +245,67 @@ maybe_cast(#token{pid = Pid, enabled = true}, Cast) ->
maybe_cast(_, _Call) ->
ok.
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
- Limit =/= 0 andalso Volume >= Limit.
+limit_reached(irrelevant, _) ->
+ false;
+limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume,
+ credits = Credits}) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = 0 }} -> true;
+ _ -> false
+ end orelse (Limit =/= 0 andalso Volume >= Limit).
+
+decr_credit(CTag, Len, State = #lim{ credits = Credits,
+ ch_pid = ChPid } ) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ credit = Credit, count = Count, drain = Drain }} ->
+ {NewCredit, NewCount} =
+ case {Credit, Len, Drain} of
+ {1, _, _} -> {0, serial_add(Count, 1)};
+ {_, 1, true} ->
+ %% Drain, so advance til credit = 0
+ NewCount0 = serial_add(Count, (Credit - 1)),
+ send_drained(ChPid, CTag, NewCount0),
+ {0, NewCount0}; %% Magic reduction to 0
+ {_, _, _} -> {Credit - 1, serial_add(Count, 1)}
+ end,
+ update_credit(CTag, NewCredit, NewCount, Drain, State);
+ error ->
+ State
+ end.
+
+send_drained(ChPid, CTag, Count) ->
+ rabbit_channel:send_command(ChPid,
+ #'basic.credit_state'{consumer_tag = CTag,
+ credit = 0,
+ count = Count,
+ available = 0,
+ drain = true}).
+
+%% Assert the credit state. The count may not match ours, in which
+%% case we must rebase the credit.
+%% TODO Edge case: if the queue has nothing in it, and drain is set,
+%% we want to send a basic.credit back.
+reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) ->
+ Count =
+ case dict:find(CTag, Credits) of
+ {ok, #credit{ count = LocalCount }} ->
+ LocalCount;
+ _ -> Count0
+ end,
+ %% Our credit may have been reduced while messages are in flight,
+ %% so we bottom out at 0.
+ Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)),
+ update_credit(CTag, Credit, Count, Drain, State).
+
+%% Store the credit
+update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:erase(CTag, Credits)};
+
+update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) ->
+ State#lim{credits = dict:store(CTag,
+ #credit{credit = Credit,
+ count = Count,
+ drain = Drain}, Credits)}.
blocked(#lim{blocked = Blocked}) -> Blocked.
@@ -262,3 +347,4 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
ok
end,
State#lim{queues = NewQueues}.
+
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index ce3e380248..c316db99b0 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -68,6 +68,7 @@
-export([base64url/1]).
-export([interval_operation/4]).
-export([get_parent/0]).
+-export([serial_add/2, serial_compare/2, serial_diff/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -82,6 +83,7 @@
-ifdef(use_specs).
-export_type([resource_name/0, thunk/1]).
+-export_type([serial_number/0]).
-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
-type(thunk(T) :: fun(() -> T)).
@@ -94,6 +96,8 @@
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])).
-type(graph_edge_fun() ::
fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])).
+-type(serial_number() :: non_neg_integer()).
+-type(serial_compare_result() :: 'equal' | 'less' | 'greater').
-spec(method_record_type/1 :: (rabbit_framing:amqp_method_record())
-> rabbit_framing:amqp_method_name()).
@@ -243,6 +247,12 @@
({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer())
-> {any(), non_neg_integer()}).
-spec(get_parent/0 :: () -> pid()).
+-spec(serial_add/2 :: (serial_number(), non_neg_integer()) ->
+ serial_number()).
+-spec(serial_compare/2 :: (serial_number(), serial_number()) ->
+ serial_compare_result()).
+-spec(serial_diff/2 :: (serial_number(), serial_number()) ->
+ integer()).
-endif.
%%----------------------------------------------------------------------------
@@ -1080,3 +1090,44 @@ whereis_name(Name) ->
%% End copypasta from gen_server2.erl
%% -------------------------------------------------------------------------
+
+%% Serial arithmetic for unsigned ints.
+%% http://www.faqs.org/rfcs/rfc1982.html
+%% SERIAL_BITS = 32
+
+%% 2 ^ SERIAL_BITS
+-define(SERIAL_MAX, 16#100000000).
+%% 2 ^ (SERIAL_BITS - 1) - 1
+-define(SERIAL_MAX_ADDEND, 16#7fffffff).
+
+serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND ->
+ (S + N) rem ?SERIAL_MAX;
+serial_add(S, N) ->
+ exit({out_of_bound_serial_addition, S, N}).
+
+serial_compare(A, B) ->
+ if A =:= B ->
+ equal;
+ (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso A - B > ?SERIAL_MAX_ADDEND) ->
+ less;
+ (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse
+ (A > B andalso B - A < ?SERIAL_MAX_ADDEND) ->
+ greater;
+ true -> exit({indeterminate_serial_comparison, A, B})
+ end.
+
+-define(SERIAL_DIFF_BOUND, 16#80000000).
+
+serial_diff(A, B) ->
+ Diff = A - B,
+ if Diff > (?SERIAL_DIFF_BOUND) ->
+ %% B is actually greater than A
+ - (?SERIAL_MAX - Diff);
+ Diff < - (?SERIAL_DIFF_BOUND) ->
+ ?SERIAL_MAX + Diff;
+ Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND ->
+ Diff;
+ true ->
+ exit({indeterminate_serial_diff, A, B})
+ end.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 09ed3d0890..9ca7763d09 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -50,6 +50,7 @@ all_tests() ->
passed = test_table_codec(),
passed = test_content_framing(),
passed = test_content_transcoding(),
+ passed = test_serial_arithmetic(),
passed = test_topic_matching(),
passed = test_log_management(),
passed = test_app_management(),
@@ -558,6 +559,29 @@ sequence_with_content(Sequence) ->
rabbit_framing_amqp_0_9_1),
Sequence).
+test_serial_arithmetic() ->
+ 1 = rabbit_misc:serial_add(0, 1),
+ 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff),
+ 0 = rabbit_misc:serial_add(16#ffffffff, 1),
+ %% Cannot add more than 2 ^ 31 - 1
+ case catch rabbit_misc:serial_add(200, 16#80000000) of
+ {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok;
+ _ -> exit(fail_out_of_bound_serial_addition)
+ end,
+
+ 1 = rabbit_misc:serial_diff(1, 0),
+ 2 = rabbit_misc:serial_diff(1, 16#ffffffff),
+ -2 = rabbit_misc:serial_diff(16#ffffffff, 1),
+ case catch rabbit_misc:serial_diff(0, 16#80000000) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of
+ {'EXIT', {indeterminate_serial_diff, _, _}} -> ok;
+ _ -> exit(fail_indeterminate_serial_difference)
+ end,
+ passed.
+
test_topic_matching() ->
XName = #resource{virtual_host = <<"/">>,
kind = exchange,