diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-07 18:07:56 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-07 18:07:56 +0000 |
| commit | afebcd8aaa467f0a4f8ade278d9b4c019bf96275 (patch) | |
| tree | 628f953eb081a5e7fd6ef3035e69aff878f39984 | |
| parent | 1a663a1760ab67d00805ecb9394456586dfbde07 (diff) | |
| parent | d9f5cf966390120506e78265c18ac29dd9a50034 (diff) | |
| download | rabbitmq-server-git-afebcd8aaa467f0a4f8ade278d9b4c019bf96275.tar.gz | |
Merge in default.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 142 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 24 |
5 files changed, 228 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4341c3d6d8..08a1ca70b0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -423,13 +423,17 @@ deliver_msgs_to_consumers(DeliverFun, false, deliver_msgs_to_consumers(DeliverFun, Stop, State1) end. -deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> +deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> C = ch_record(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of + Consumer#consumer.ack_required, + Consumer#consumer.tag, + BQ:len(BQS)) of false -> block_consumer(C#cr{is_limit_active = true}, E), {false, State}; true -> AC1 = queue:in(E, State#q.active_consumers), @@ -1245,7 +1249,8 @@ handle_cast({limit, ChPid, Limiter}, State) -> true -> ok = rabbit_limiter:register(Limiter, self()); false -> ok end, - Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), + Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter) + andalso rabbit_limiter:is_blocked(Limiter), C#cr{limiter = Limiter, is_limit_active = Limited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 88e3dfc583..6c2c37f9a8 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1079,6 +1079,37 @@ handle_method(#'channel.flow'{active = false}, _, {noreply, State2} end; +handle_method(#'basic.credit'{consumer_tag = CTag, + credit = Credit, + count = Count, + drain = Drain}, _, + State = #ch{limiter = Limiter, + consumer_mapping = Consumers}) -> + %% We get Available first because it's likely that as soon as we set + %% the credit msgs will get consumed and it'll be out of date. Why do we + %% want that? Because at least then it's consistent with the credit value + %% we return. And Available is always going to be racy. + Available = case dict:find(CTag, Consumers) of + {ok, {Q, _}} -> case rabbit_amqqueue:stat(Q) of + {ok, Len, _} -> Len; + _ -> -1 + end; + error -> -1 %% TODO these -1s smell very iffy! + end, + Limiter1 = case rabbit_limiter:is_enabled(Limiter) of + true -> Limiter; + false -> enable_limiter(State) + end, + Limiter3 = + case rabbit_limiter:set_credit( + Limiter1, CTag, Credit, Count, Drain) of + ok -> Limiter1; + {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), + Limiter2 + end, + State1 = State#ch{limiter = Limiter3}, + return_ok(State1, false, #'basic.credit_ok'{available = Available}); + handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( command_invalid, "unimplemented method", []). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 2b15498ed9..5f1bc07ca0 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -15,15 +15,20 @@ %% -module(rabbit_limiter). +-include("rabbit_framing.hrl"). -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). + -export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/5, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_blocked/1]). +-export([set_credit/5]). + +-import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- @@ -42,7 +47,8 @@ -spec(enable/2 :: (token(), non_neg_integer()) -> token()). -spec(disable/1 :: (token()) -> token()). -spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). +-spec(can_send/5 :: (token(), pid(), boolean(), + rabbit_types:ctag(), non_neg_integer()) -> boolean()). -spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (token(), pid()) -> 'ok'). -spec(unregister/2 :: (token(), pid()) -> 'ok'). @@ -50,6 +56,9 @@ -spec(block/1 :: (token()) -> 'ok'). -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). +-spec(set_credit/5 :: (token(), rabbit_types:ctag(), + non_neg_integer(), + non_neg_integer(), boolean()) -> 'ok'). -endif. @@ -58,11 +67,11 @@ -record(lim, {prefetch_count = 0, ch_pid, blocked = false, + credits = dict:new(), queues = orddict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). -%% 'Notify' is a boolean that indicates whether a queue should be -%% notified of a change in the limit or volume that may allow it to -%% deliver more messages via the limiter's channel. + +-record(credit, {count = 0, credit = 0, drain = false}). %%---------------------------------------------------------------------------- %% API @@ -88,18 +97,19 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> +can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired, CTag, Len) -> rabbit_misc:with_exit_handler( fun () -> true end, fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) + gen_server2:call(Pid, {can_send, QPid, AckRequired, CTag, Len}, + infinity) end); -can_send(_, _, _) -> +can_send(_, _, _, _, _) -> true. %% Let the limiter know that the channel has received some acks from a %% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +ack(Limiter, CTag) -> maybe_cast(Limiter, {ack, CTag}). register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). @@ -116,6 +126,9 @@ block(Limiter) -> unblock(Limiter) -> maybe_call(Limiter, {unblock, Limiter}, ok). +set_credit(Limiter, CTag, Credit, Count, Drain) -> + maybe_call(Limiter, {set_credit, CTag, Credit, Count, Drain, Limiter}, ok). + is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). @@ -129,23 +142,26 @@ init([]) -> prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. -handle_call({can_send, QPid, _AckRequired}, _From, +handle_call({can_send, QPid, _AckRequired, _CTag, _Len}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; -handle_call({can_send, QPid, AckRequired}, _From, +handle_call({can_send, QPid, AckRequired, CTag, Len}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case limit_reached(CTag, State) of true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + false -> {reply, true, + decr_credit(CTag, Len, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end; handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of + case maybe_notify(irrelevant, + State, State#lim{prefetch_count = PrefetchCount}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -155,8 +171,17 @@ handle_call({limit, PrefetchCount, Token}, _From, State) -> handle_call(block, _From, State) -> {reply, ok, State#lim{blocked = true}}; +handle_call({set_credit, CTag, Credit, Count, Drain, Token}, _From, State) -> + case maybe_notify(CTag, State, + reset_credit(CTag, Credit, Count, Drain, State)) of + {cont, State1} -> + {reply, ok, State1}; + {stop, State1} -> + {reply, {disabled, Token#token{enabled = false}}, State1} + end; + handle_call({unblock, Token}, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of + case maybe_notify(irrelevant, State, State#lim{blocked = false}) of {cont, State1} -> {reply, ok, State1}; {stop, State1} -> @@ -172,11 +197,11 @@ handle_call({enable, Token, Channel, Volume}, _From, State) -> handle_call({disable, Token}, _From, State) -> {reply, Token#token{enabled = false}, State}. -handle_cast({ack, Count}, State = #lim{volume = Volume}) -> +handle_cast({ack, CTag}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; - true -> Volume - Count + true -> Volume - 1 end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), + {cont, State1} = maybe_notify(CTag, State, State#lim{volume = NewVolume}), {noreply, State1}; handle_cast({register, QPid}, State) -> @@ -198,13 +223,14 @@ code_change(_, State, _) -> %% Internal plumbing %%---------------------------------------------------------------------------- -maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of +maybe_notify(CTag, OldState, NewState) -> + case (limit_reached(CTag, OldState) orelse blocked(OldState)) andalso + not (limit_reached(CTag, NewState) orelse blocked(NewState)) of true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont + {case {NewState1#lim.prefetch_count, + dict:size(NewState1#lim.credits)} of + {0, 0} -> stop; + _ -> cont end, NewState1}; false -> {cont, NewState} end. @@ -219,8 +245,67 @@ maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> maybe_cast(_, _Call) -> ok. -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> - Limit =/= 0 andalso Volume >= Limit. +limit_reached(irrelevant, _) -> + false; +limit_reached(CTag, #lim{prefetch_count = Limit, volume = Volume, + credits = Credits}) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = 0 }} -> true; + _ -> false + end orelse (Limit =/= 0 andalso Volume >= Limit). + +decr_credit(CTag, Len, State = #lim{ credits = Credits, + ch_pid = ChPid } ) -> + case dict:find(CTag, Credits) of + {ok, #credit{ credit = Credit, count = Count, drain = Drain }} -> + {NewCredit, NewCount} = + case {Credit, Len, Drain} of + {1, _, _} -> {0, serial_add(Count, 1)}; + {_, 1, true} -> + %% Drain, so advance til credit = 0 + NewCount0 = serial_add(Count, (Credit - 1)), + send_drained(ChPid, CTag, NewCount0), + {0, NewCount0}; %% Magic reduction to 0 + {_, _, _} -> {Credit - 1, serial_add(Count, 1)} + end, + update_credit(CTag, NewCredit, NewCount, Drain, State); + error -> + State + end. + +send_drained(ChPid, CTag, Count) -> + rabbit_channel:send_command(ChPid, + #'basic.credit_state'{consumer_tag = CTag, + credit = 0, + count = Count, + available = 0, + drain = true}). + +%% Assert the credit state. The count may not match ours, in which +%% case we must rebase the credit. +%% TODO Edge case: if the queue has nothing in it, and drain is set, +%% we want to send a basic.credit back. +reset_credit(CTag, Credit0, Count0, Drain, State = #lim{credits = Credits}) -> + Count = + case dict:find(CTag, Credits) of + {ok, #credit{ count = LocalCount }} -> + LocalCount; + _ -> Count0 + end, + %% Our credit may have been reduced while messages are in flight, + %% so we bottom out at 0. + Credit = erlang:max(0, serial_diff(serial_add(Count0, Credit0), Count)), + update_credit(CTag, Credit, Count, Drain, State). + +%% Store the credit +update_credit(CTag, -1, _Count, _Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:erase(CTag, Credits)}; + +update_credit(CTag, Credit, Count, Drain, State = #lim{credits = Credits}) -> + State#lim{credits = dict:store(CTag, + #credit{credit = Credit, + count = Count, + drain = Drain}, Credits)}. blocked(#lim{blocked = Blocked}) -> Blocked. @@ -262,3 +347,4 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. + diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ce3e380248..c316db99b0 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -68,6 +68,7 @@ -export([base64url/1]). -export([interval_operation/4]). -export([get_parent/0]). +-export([serial_add/2, serial_compare/2, serial_diff/2]). %% Horrible macro to use in guards -define(IS_BENIGN_EXIT(R), @@ -82,6 +83,7 @@ -ifdef(use_specs). -export_type([resource_name/0, thunk/1]). +-export_type([serial_number/0]). -type(ok_or_error() :: rabbit_types:ok_or_error(any())). -type(thunk(T) :: fun(() -> T)). @@ -94,6 +96,8 @@ fun ((atom(), [term()]) -> [{digraph:vertex(), digraph_label()}])). -type(graph_edge_fun() :: fun ((atom(), [term()]) -> [{digraph:vertex(), digraph:vertex()}])). +-type(serial_number() :: non_neg_integer()). +-type(serial_compare_result() :: 'equal' | 'less' | 'greater'). -spec(method_record_type/1 :: (rabbit_framing:amqp_method_record()) -> rabbit_framing:amqp_method_name()). @@ -243,6 +247,12 @@ ({atom(), atom(), any()}, float(), non_neg_integer(), non_neg_integer()) -> {any(), non_neg_integer()}). -spec(get_parent/0 :: () -> pid()). +-spec(serial_add/2 :: (serial_number(), non_neg_integer()) -> + serial_number()). +-spec(serial_compare/2 :: (serial_number(), serial_number()) -> + serial_compare_result()). +-spec(serial_diff/2 :: (serial_number(), serial_number()) -> + integer()). -endif. %%---------------------------------------------------------------------------- @@ -1080,3 +1090,44 @@ whereis_name(Name) -> %% End copypasta from gen_server2.erl %% ------------------------------------------------------------------------- + +%% Serial arithmetic for unsigned ints. +%% http://www.faqs.org/rfcs/rfc1982.html +%% SERIAL_BITS = 32 + +%% 2 ^ SERIAL_BITS +-define(SERIAL_MAX, 16#100000000). +%% 2 ^ (SERIAL_BITS - 1) - 1 +-define(SERIAL_MAX_ADDEND, 16#7fffffff). + +serial_add(S, N) when N =< ?SERIAL_MAX_ADDEND -> + (S + N) rem ?SERIAL_MAX; +serial_add(S, N) -> + exit({out_of_bound_serial_addition, S, N}). + +serial_compare(A, B) -> + if A =:= B -> + equal; + (A < B andalso B - A < ?SERIAL_MAX_ADDEND) orelse + (A > B andalso A - B > ?SERIAL_MAX_ADDEND) -> + less; + (A < B andalso B - A > ?SERIAL_MAX_ADDEND) orelse + (A > B andalso B - A < ?SERIAL_MAX_ADDEND) -> + greater; + true -> exit({indeterminate_serial_comparison, A, B}) + end. + +-define(SERIAL_DIFF_BOUND, 16#80000000). + +serial_diff(A, B) -> + Diff = A - B, + if Diff > (?SERIAL_DIFF_BOUND) -> + %% B is actually greater than A + - (?SERIAL_MAX - Diff); + Diff < - (?SERIAL_DIFF_BOUND) -> + ?SERIAL_MAX + Diff; + Diff < ?SERIAL_DIFF_BOUND andalso Diff > -?SERIAL_DIFF_BOUND -> + Diff; + true -> + exit({indeterminate_serial_diff, A, B}) + end. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 09ed3d0890..9ca7763d09 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -50,6 +50,7 @@ all_tests() -> passed = test_table_codec(), passed = test_content_framing(), passed = test_content_transcoding(), + passed = test_serial_arithmetic(), passed = test_topic_matching(), passed = test_log_management(), passed = test_app_management(), @@ -558,6 +559,29 @@ sequence_with_content(Sequence) -> rabbit_framing_amqp_0_9_1), Sequence). +test_serial_arithmetic() -> + 1 = rabbit_misc:serial_add(0, 1), + 16#7fffffff = rabbit_misc:serial_add(0, 16#7fffffff), + 0 = rabbit_misc:serial_add(16#ffffffff, 1), + %% Cannot add more than 2 ^ 31 - 1 + case catch rabbit_misc:serial_add(200, 16#80000000) of + {'EXIT', {out_of_bound_serial_addition, _, _}} -> ok; + _ -> exit(fail_out_of_bound_serial_addition) + end, + + 1 = rabbit_misc:serial_diff(1, 0), + 2 = rabbit_misc:serial_diff(1, 16#ffffffff), + -2 = rabbit_misc:serial_diff(16#ffffffff, 1), + case catch rabbit_misc:serial_diff(0, 16#80000000) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + case catch rabbit_misc:serial_diff(16#ffffffff, 16#7fffffff) of + {'EXIT', {indeterminate_serial_diff, _, _}} -> ok; + _ -> exit(fail_indeterminate_serial_difference) + end, + passed. + test_topic_matching() -> XName = #resource{virtual_host = <<"/">>, kind = exchange, |
