diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-20 16:08:48 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-03-20 16:08:48 +0000 |
| commit | 0b1d68aa1eba5d6d2a8e8f8f579528b1b668aab3 (patch) | |
| tree | 0303b7fa33fc5dd97a6bfe8435a52f590f3e0f4a /src | |
| parent | bdb5239b384f266424c6f3e1dbad4f28f49fc801 (diff) | |
| download | rabbitmq-server-git-0b1d68aa1eba5d6d2a8e8f8f579528b1b668aab3.tar.gz | |
better names for the prefetch related part of the limiter API
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 88 |
2 files changed, 50 insertions, 47 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 67cabcfbfb..1787d688d5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -810,12 +810,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> - Limiter1 = rabbit_limiter:unlimit(Limiter), + Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> - Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)), + Limiter1 = rabbit_limiter:limit_prefetch(Limiter, + PrefetchCount, queue:len(UAMQ)), {reply, #'basic.qos_ok'{}, maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})}; @@ -1363,7 +1364,7 @@ consumer_queues(Consumers) -> notify_limiter(Limiter, Acked) -> %% optimisation: avoid the potentially expensive 'foldl' in the %% common case. - case rabbit_limiter:is_limited(Limiter) of + case rabbit_limiter:is_prefetch_limited(Limiter) of false -> ok; true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc; ({_, _, _}, Acc) -> Acc + 1 @@ -1528,7 +1529,7 @@ i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(prefetch_count, #ch{limiter = Limiter}) -> - rabbit_limiter:get_limit(Limiter); + rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> rabbit_limiter:is_blocked(Limiter); i(Item, _) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index b914306bd4..430c2716c4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -57,9 +57,10 @@ %% The interactions with the limiter are as follows: %% %% 1. Channels tell the limiter about basic.qos prefetch counts - -%% that's what the limit/3, unlimit/1, is_limited/1, get_limit/1 -%% API functions are about - and channel.flow blocking - that's -%% what block/1, unblock/1 and is_blocked/1 are for. +%% that's what the limit_prefetch/3, unlimit_prefetch/1, +%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are +%% about - and channel.flow blocking - that's what block/1, +%% unblock/1 and is_blocked/1 are for. %% %% 2. Queues register with the limiter - this happens as part of %% activate/1. @@ -106,8 +107,9 @@ -export([start_link/0]). %% channel API --export([new/1, limit/3, unlimit/1, block/1, unblock/1, - is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]). +-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1, + is_prefetch_limited/1, is_blocked/1, is_active/1, + get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/2, resume/1, deactivate/1, is_suspended/1]). @@ -117,31 +119,31 @@ %%---------------------------------------------------------------------------- --record(lstate, {pid, limited, blocked}). +-record(lstate, {pid, prefetch_limited, blocked}). -record(qstate, {pid, state}). -ifdef(use_specs). --type(lstate() :: #lstate{pid :: pid(), - limited :: boolean(), - blocked :: boolean()}). +-type(lstate() :: #lstate{pid :: pid(), + prefetch_limited :: boolean(), + blocked :: boolean()}). -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). --spec(limit/3 :: (lstate(), non_neg_integer(), non_neg_integer()) -> - lstate()). --spec(unlimit/1 :: (lstate()) -> lstate()). --spec(block/1 :: (lstate()) -> lstate()). --spec(unblock/1 :: (lstate()) -> lstate()). --spec(is_limited/1 :: (lstate()) -> boolean()). --spec(is_blocked/1 :: (lstate()) -> boolean()). --spec(is_active/1 :: (lstate()) -> boolean()). --spec(get_limit/1 :: (lstate()) -> non_neg_integer()). --spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). --spec(pid/1 :: (lstate()) -> pid()). +-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer()) + -> lstate()). +-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()). +-spec(block/1 :: (lstate()) -> lstate()). +-spec(unblock/1 :: (lstate()) -> lstate()). +-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()). +-spec(is_blocked/1 :: (lstate()) -> boolean()). +-spec(is_active/1 :: (lstate()) -> boolean()). +-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()). +-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok'). +-spec(pid/1 :: (lstate()) -> pid()). -spec(client/1 :: (pid()) -> qstate()). -spec(activate/1 :: (qstate()) -> qstate()). @@ -173,15 +175,16 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). new(Pid) -> %% this a 'call' to ensure that it is invoked at most once. ok = gen_server:call(Pid, {new, self()}), - #lstate{pid = Pid, limited = false, blocked = false}. + #lstate{pid = Pid, prefetch_limited = false, blocked = false}. -limit(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> - ok = gen_server:call(L#lstate.pid, {limit, PrefetchCount, UnackedCount}), - L#lstate{limited = true}. +limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 -> + ok = gen_server:call(L#lstate.pid, + {limit_prefetch, PrefetchCount, UnackedCount}), + L#lstate{prefetch_limited = true}. -unlimit(L) -> - ok = gen_server:call(L#lstate.pid, unlimit), - L#lstate{limited = false}. +unlimit_prefetch(L) -> + ok = gen_server:call(L#lstate.pid, unlimit_prefetch), + L#lstate{prefetch_limited = false}. block(L) -> ok = gen_server:call(L#lstate.pid, block), @@ -191,16 +194,16 @@ unblock(L) -> ok = gen_server:call(L#lstate.pid, unblock), L#lstate{blocked = false}. -is_limited(#lstate{limited = Limited}) -> Limited. +is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited. is_blocked(#lstate{blocked = Blocked}) -> Blocked. -is_active(L) -> is_limited(L) orelse is_blocked(L). +is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L). -get_limit(#lstate{limited = false}) -> 0; -get_limit(L) -> gen_server:call(L#lstate.pid, get_limit). +get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0; +get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit). -ack(#lstate{limited = false}, _AckCount) -> ok; +ack(#lstate{prefetch_limited = false}, _AckCount) -> ok; ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}). pid(#lstate{pid = Pid}) -> Pid. @@ -212,8 +215,6 @@ activate(L = #qstate{state = dormant}) -> L#qstate{state = active}; activate(L) -> L. -%% Ask the limiter whether the queue can deliver a message without -%% breaching a limit. can_send(L = #qstate{state = active}, AckRequired) -> rabbit_misc:with_exit_handler( fun () -> {continue, L} end, @@ -241,20 +242,20 @@ is_suspended(#qstate{}) -> false. init([]) -> {ok, #lim{}}. -prioritise_call(get_limit, _From, _State) -> 9; -prioritise_call(_Msg, _From, _State) -> 0. +prioritise_call(get_prefetch_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> {reply, ok, State#lim{ch_pid = ChPid}}; -handle_call({limit, PrefetchCount, UnackedCount}, _From, State) -> +handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) -> %% assertion true = State#lim.prefetch_count == 0 orelse State#lim.volume == UnackedCount, {reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount, volume = UnackedCount})}; -handle_call(unlimit, _From, State) -> +handle_call(unlimit_prefetch, _From, State) -> {reply, ok, maybe_notify(State, State#lim{prefetch_count = 0, volume = 0})}; @@ -264,7 +265,8 @@ handle_call(block, _From, State) -> handle_call(unblock, _From, State) -> {reply, ok, maybe_notify(State, State#lim{blocked = false})}; -handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> +handle_call(get_prefetch_limit, _From, + State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; handle_call({can_send, QPid, _AckRequired}, _From, @@ -272,7 +274,7 @@ handle_call({can_send, QPid, _AckRequired}, _From, {reply, false, limit_queue(QPid, State)}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> - case limit_reached(State) of + case prefetch_limit_reached(State) of true -> {reply, false, limit_queue(QPid, State)}; false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; true -> Volume @@ -305,13 +307,13 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case (limit_reached(OldState) orelse blocked(OldState)) andalso - not (limit_reached(NewState) orelse blocked(NewState)) of + case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso + not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of true -> notify_queues(NewState); false -> NewState end. -limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> +prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. blocked(#lim{blocked = Blocked}) -> Blocked. |
