diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 11:10:40 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-19 11:10:40 +0000 |
| commit | 8614b6e32381b6b499483d689afd4db6c9bc3856 (patch) | |
| tree | c79a28c8b22a3ee719f4427e688a62788b53e75c /src | |
| parent | 7c25a669cc01dd7c51941ef0b48b8a4b4a3cfd8b (diff) | |
| download | rabbitmq-server-git-8614b6e32381b6b499483d689afd4db6c9bc3856.tar.gz | |
limiter API revision, part 1/2 - channel-side API
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 68 | ||||
| -rw-r--r-- | src/rabbit_channel_sup.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 207 |
5 files changed, 139 insertions, 161 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 82ac74fac5..bd5de23923 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -144,7 +144,7 @@ -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). -spec(notify_down_all/2 :: (qpids(), pid()) -> ok_or_errors()). --spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:token()) -> +-spec(limit_all/3 :: (qpids(), pid(), rabbit_limiter:lstate()) -> ok_or_errors()). -spec(basic_get/3 :: (rabbit_types:amqqueue(), pid(), boolean()) -> {'ok', non_neg_integer(), qmsg()} | 'empty'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 18b641d4f7..0ddc9eba11 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -372,7 +372,7 @@ ch_record(ChPid) -> consumer_count = 0, blocked_consumers = queue:new(), is_limit_active = false, - limiter = rabbit_limiter:make_token(), + limiter = undefined, unsent_message_count = 0}, put(Key, C), C; @@ -395,18 +395,17 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> erase_ch_record(#cr{ch_pid = ChPid, limiter = Limiter, monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(Limiter, self()), + ok = rabbit_limiter:unregister(Limiter), erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> - ok = rabbit_limiter:register(Limiter, self()), + ok = rabbit_limiter:register(Limiter), update_ch_record(C#cr{consumer_count = 1}); update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> - ok = rabbit_limiter:unregister(Limiter, self()), - update_ch_record(C#cr{consumer_count = 0, - limiter = rabbit_limiter:make_token()}); + ok = rabbit_limiter:unregister(Limiter), + update_ch_record(C#cr{consumer_count = 0, limiter = undefined}); update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> update_ch_record(C#cr{consumer_count = Count + Delta}). @@ -444,7 +443,7 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), + false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required) of false -> block_consumer(C#cr{is_limit_active = true}, E), {false, State}; @@ -1308,11 +1307,11 @@ handle_cast({limit, ChPid, Limiter}, State) -> limiter = OldLimiter, is_limit_active = OldLimited}) -> case (ConsumerCount =/= 0 andalso - not rabbit_limiter:is_enabled(OldLimiter)) of - true -> ok = rabbit_limiter:register(Limiter, self()); + not rabbit_limiter:is_active(OldLimiter)) of + true -> ok = rabbit_limiter:register(Limiter); false -> ok end, - Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), + Limited = OldLimited andalso rabbit_limiter:is_active(Limiter), C#cr{limiter = Limiter, is_limit_active = Limited} end)); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 792a06c908..cda5747ad4 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -82,7 +82,7 @@ (channel_number(), pid(), pid(), pid(), string(), rabbit_types:protocol(), rabbit_types:user(), rabbit_types:vhost(), rabbit_framing:amqp_table(), - pid(), rabbit_limiter:token()) -> rabbit_types:ok_pid_or_error()). + pid(), pid()) -> rabbit_types:ok_pid_or_error()). -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). @@ -180,7 +180,7 @@ force_event_refresh() -> %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, - Capabilities, CollectorPid, Limiter]) -> + Capabilities, CollectorPid, LimiterPid]) -> process_flag(trap_exit, true), ok = pg_local:join(rabbit_channels, self()), State = #ch{state = starting, @@ -190,7 +190,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, writer_pid = WriterPid, conn_pid = ConnPid, conn_name = ConnName, - limiter = Limiter, + limiter = rabbit_limiter:new(LimiterPid), tx = none, next_tag = 1, unacked_message_q = queue:new(), @@ -804,18 +804,10 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> "prefetch_size!=0 (~w)", [Size]); handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, - State = #ch{limiter = Limiter}) -> - Limiter1 = case {rabbit_limiter:is_enabled(Limiter), PrefetchCount} of - {false, 0} -> Limiter; - {false, _} -> enable_limiter(State); - {_, _} -> Limiter - end, - Limiter3 = case rabbit_limiter:limit(Limiter1, PrefetchCount) of - ok -> Limiter1; - {disabled, Limiter2} -> ok = limit_queues(Limiter2, State), - Limiter2 - end, - {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter3}}; + State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> + Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)), + {reply, #'basic.qos_ok'{}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, @@ -1078,25 +1070,23 @@ handle_method(#'confirm.select'{nowait = NoWait}, _, State) -> handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter = Limiter}) -> - Limiter2 = case rabbit_limiter:unblock(Limiter) of - ok -> Limiter; - {disabled, Limiter1} -> ok = limit_queues(Limiter1, State), - Limiter1 - end, - {reply, #'channel.flow_ok'{active = true}, State#ch{limiter = Limiter2}}; + Limiter1 = rabbit_limiter:unblock(Limiter), + {reply, #'channel.flow_ok'{active = true}, + maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; handle_method(#'channel.flow'{active = false}, _, State = #ch{consumer_mapping = Consumers, limiter = Limiter}) -> - Limiter1 = case rabbit_limiter:is_enabled(Limiter) of - true -> Limiter; - false -> enable_limiter(State) - end, - State1 = State#ch{limiter = Limiter1}, - ok = rabbit_limiter:block(Limiter1), - QPids = consumer_queues(Consumers), - ok = rabbit_amqqueue:flush_all(QPids, self()), - {noreply, maybe_send_flow_ok(State1#ch{blocking = sets:from_list(QPids)})}; + case rabbit_limiter:is_blocked(Limiter) of + true -> {noreply, maybe_send_flow_ok(State)}; + false -> Limiter1 = rabbit_limiter:block(Limiter), + State1 = maybe_limit_queues(Limiter, Limiter1, + State#ch{limiter = Limiter1}), + QPids = consumer_queues(Consumers), + ok = rabbit_amqqueue:flush_all(QPids, self()), + {noreply, maybe_send_flow_ok( + State1#ch{blocking = sets:from_list(QPids)})} + end; handle_method(_MethodRecord, _Content, _State) -> rabbit_misc:protocol_error( @@ -1332,14 +1322,14 @@ foreach_per_queue(F, UAL) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_foreach(F, T). -enable_limiter(State = #ch{unacked_message_q = UAMQ, - limiter = Limiter}) -> - Limiter1 = rabbit_limiter:enable(Limiter, queue:len(UAMQ)), - ok = limit_queues(Limiter1, State), - Limiter1. - -limit_queues(Limiter, #ch{consumer_mapping = Consumers}) -> - rabbit_amqqueue:limit_all(consumer_queues(Consumers), self(), Limiter). +maybe_limit_queues(OldLimiter, NewLimiter, State) -> + case ((not rabbit_limiter:is_active(OldLimiter)) andalso + rabbit_limiter:is_active(NewLimiter)) of + true -> Queues = consumer_queues(State#ch.consumer_mapping), + rabbit_amqqueue:limit_all(Queues, self(), NewLimiter); + false -> ok + end, + State. consumer_queues(Consumers) -> lists:usort([QPid || @@ -1350,7 +1340,7 @@ consumer_queues(Consumers) -> %% messages sent in a response to a basic.get (identified by their %% 'none' consumer tag) notify_limiter(Limiter, Acked) -> - case rabbit_limiter:is_enabled(Limiter) of + case rabbit_limiter:is_limited(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl index 8ea44a8179..a0c7624b4f 100644 --- a/src/rabbit_channel_sup.erl +++ b/src/rabbit_channel_sup.erl @@ -58,7 +58,7 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User, {channel, {rabbit_channel, start_link, [Channel, ReaderPid, WriterPid, ReaderPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, AState} = rabbit_command_assembler:init(Protocol), {ok, SupPid, {ChannelPid, AState}}; @@ -72,7 +72,7 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol, {channel, {rabbit_channel, start_link, [Channel, ClientChannelPid, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector, - rabbit_limiter:make_token(LimiterPid)]}, + LimiterPid]}, intrinsic, ?MAX_WAIT, worker, [rabbit_channel]}), {ok, SupPid, {ChannelPid, none}}. diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 8a7d14fe0b..ae6563286e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -18,38 +18,43 @@ -behaviour(gen_server2). +-export([start_link/0]). +-export([new/1, limit/3, unlimit/1, block/1, unblock/1, + is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2]). +-export([can_send/2, register/1, unregister/1]). + -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/3]). --export([start_link/0, make_token/0, make_token/1, is_enabled/1, enable/2, - disable/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1, block/1, unblock/1, is_blocked/1]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled}). +-record(lstate, {pid, limited, blocked}). -ifdef(use_specs). --export_type([token/0]). +-export_type([lstate/0]). --opaque(token() :: #token{}). +-opaque(lstate() :: #lstate {pid :: pid(), + limited :: boolean(), + blocked :: boolean()}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(make_token/0 :: () -> token()). --spec(make_token/1 :: ('undefined' | pid()) -> token()). --spec(is_enabled/1 :: (token()) -> boolean()). --spec(enable/2 :: (token(), non_neg_integer()) -> token()). --spec(disable/1 :: (token()) -> token()). --spec(limit/2 :: (token(), non_neg_integer()) -> 'ok' | {'disabled', token()}). --spec(can_send/3 :: (token(), pid(), boolean()) -> boolean()). --spec(ack/2 :: (token(), non_neg_integer()) -> 'ok'). --spec(register/2 :: (token(), pid()) -> 'ok'). --spec(unregister/2 :: (token(), pid()) -> 'ok'). --spec(get_limit/1 :: (token()) -> non_neg_integer()). --spec(block/1 :: (token()) -> 'ok'). --spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). --spec(is_blocked/1 :: (token()) -> boolean()). +-spec(new/1 :: (pid()) -> lstate()). + +-spec(limit/3 :: (lstate(), non_neg_integer(), non_neg_integer()) -> + lstate()). +-spec(unlimit/1 :: (lstate()) -> lstate()). +-spec(block/1 :: (lstate()) -> lstate()). +-spec(unblock/1 :: (lstate()) -> lstate()). +-spec(is_limited/1 :: (lstate()) -> boolean()). +-spec(is_blocked/1 :: (lstate()) -> boolean()). +-spec(is_active/1 :: (lstate()) -> boolean()). +-spec(get_limit/1 :: (lstate()) -> non_neg_integer()). +-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). + +-spec(can_send/2 :: (lstate(), boolean()) -> boolean()). +-spec(register/1 :: (lstate()) -> 'ok'). +-spec(unregister/1 :: (lstate()) -> 'ok'). -endif. @@ -70,65 +75,95 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). -make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, enabled = false}. +new(Pid) -> + %% this a 'call' to ensure that it is invoked at most once. + ok = gen_server:call(Pid, {new, self()}), + #lstate{pid = Pid, limited = false, blocked = false}. -is_enabled(#token{enabled = Enabled}) -> Enabled. +limit(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> + ok = gen_server:call(L#lstate.pid, {limit, PrefetchCount, UnackedCount}), + L#lstate{limited = true}. -enable(#token{pid = Pid} = Token, Volume) -> - gen_server2:call(Pid, {enable, Token, self(), Volume}, infinity). +unlimit(L) -> + ok = gen_server:call(L#lstate.pid, unlimit), + L#lstate{limited = false}. -disable(#token{pid = Pid} = Token) -> - gen_server2:call(Pid, {disable, Token}, infinity). +block(L) -> + ok = gen_server:call(L#lstate.pid, block), + L#lstate{blocked = true}. -limit(Limiter, PrefetchCount) -> - maybe_call(Limiter, {limit, PrefetchCount, Limiter}, ok). +unblock(L) -> + ok = gen_server:call(L#lstate.pid, unblock), + L#lstate{blocked = false}. -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. Note that we don't use maybe_call here in order -%% to avoid always going through with_exit_handler/2, even when the -%% limiter is disabled. -can_send(#token{pid = Pid, enabled = true}, QPid, AckRequired) -> - rabbit_misc:with_exit_handler( - fun () -> true end, - fun () -> - gen_server2:call(Pid, {can_send, QPid, AckRequired}, infinity) - end); -can_send(_, _, _) -> - true. +is_limited(#lstate{limited = Limited}) -> Limited. -%% Let the limiter know that the channel has received some acks from a -%% consumer -ack(Limiter, Count) -> maybe_cast(Limiter, {ack, Count}). +is_blocked(#lstate{blocked = Blocked}) -> Blocked. -register(Limiter, QPid) -> maybe_cast(Limiter, {register, QPid}). +is_active(L) -> is_limited(L) orelse is_blocked(L). -unregister(Limiter, QPid) -> maybe_cast(Limiter, {unregister, QPid}). +get_limit(#lstate{limited = false}) -> 0; +get_limit(L) -> gen_server:call(L#lstate.pid, get_limit). -get_limit(Limiter) -> - rabbit_misc:with_exit_handler( - fun () -> 0 end, - fun () -> maybe_call(Limiter, get_limit, 0) end). +ack(#lstate{limited = false}, _AckCount) -> ok; +ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). -block(Limiter) -> - maybe_call(Limiter, block, ok). +%% Ask the limiter whether the queue can deliver a message without +%% breaching a limit. +can_send(L, AckRequired) -> + case is_active(L) of + false -> true; + true -> rabbit_misc:with_exit_handler( + fun () -> true end, + fun () -> Msg = {can_send, self(), AckRequired}, + gen_server2:call(L#lstate.pid, Msg, infinity) + end) + end. -unblock(Limiter) -> - maybe_call(Limiter, {unblock, Limiter}, ok). +register(L) -> + case is_active(L) of + false -> ok; + true -> gen_server:cast(L#lstate.pid, {register, self()}) + end. -is_blocked(Limiter) -> - maybe_call(Limiter, is_blocked, false). +unregister(L) -> + case is_active(L) of + false -> ok; + true -> gen_server:cast(L#lstate.pid, {unregister, self()}) + end. %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- -init([]) -> - {ok, #lim{}}. +init([]) -> {ok, #lim{}}. prioritise_call(get_limit, _From, _State) -> 9; prioritise_call(_Msg, _From, _State) -> 0. +handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> + {reply, ok, State#lim{ch_pid = ChPid}}; + +handle_call({limit, PrefetchCount, UnackedCount}, _From, State) -> + %% assertion + true = State#lim.prefetch_count == 0 orelse + State#lim.volume == UnackedCount, + {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount, + volume = UnackedCount})}; + +handle_call(unlimit, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, + volume = 0})}; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State) -> + {reply, ok, maybe_notify(State, State#lim{blocked = false})}; + +handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> + {reply, PrefetchCount, State}; + handle_call({can_send, QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, limit_queue(QPid, State)}; @@ -139,45 +174,13 @@ handle_call({can_send, QPid, AckRequired}, _From, false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume end}} - end; - -handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> - {reply, PrefetchCount, State}; - -handle_call({limit, PrefetchCount, Token}, _From, State) -> - case maybe_notify(State, State#lim{prefetch_count = PrefetchCount}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(block, _From, State) -> - {reply, ok, State#lim{blocked = true}}; - -handle_call({unblock, Token}, _From, State) -> - case maybe_notify(State, State#lim{blocked = false}) of - {cont, State1} -> - {reply, ok, State1}; - {stop, State1} -> - {reply, {disabled, Token#token{enabled = false}}, State1} - end; - -handle_call(is_blocked, _From, State) -> - {reply, blocked(State), State}; - -handle_call({enable, Token, Channel, Volume}, _From, State) -> - {reply, Token#token{enabled = true}, - State#lim{ch_pid = Channel, volume = Volume}}; -handle_call({disable, Token}, _From, State) -> - {reply, Token#token{enabled = false}, State}. + end. handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count end, - {cont, State1} = maybe_notify(State, State#lim{volume = NewVolume}), - {noreply, State1}; + {noreply, maybe_notify(State, State#lim{volume = NewVolume})}; handle_cast({register, QPid}, State) -> {noreply, remember_queue(QPid, State)}; @@ -201,24 +204,10 @@ code_change(_, State, _) -> maybe_notify(OldState, NewState) -> case (limit_reached(OldState) orelse blocked(OldState)) andalso not (limit_reached(NewState) orelse blocked(NewState)) of - true -> NewState1 = notify_queues(NewState), - {case NewState1#lim.prefetch_count of - 0 -> stop; - _ -> cont - end, NewState1}; - false -> {cont, NewState} + true -> notify_queues(NewState); + false -> NewState end. -maybe_call(#token{pid = Pid, enabled = true}, Call, _Default) -> - gen_server2:call(Pid, Call, infinity); -maybe_call(_, _Call, Default) -> - Default. - -maybe_cast(#token{pid = Pid, enabled = true}, Cast) -> - gen_server2:cast(Pid, Cast); -maybe_cast(_, _Call) -> - ok. - limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. |
