summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-03-20 16:08:48 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-03-20 16:08:48 +0000
commit0b1d68aa1eba5d6d2a8e8f8f579528b1b668aab3 (patch)
tree0303b7fa33fc5dd97a6bfe8435a52f590f3e0f4a /src
parentbdb5239b384f266424c6f3e1dbad4f28f49fc801 (diff)
downloadrabbitmq-server-git-0b1d68aa1eba5d6d2a8e8f8f579528b1b668aab3.tar.gz
better names for the prefetch related part of the limiter API
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_limiter.erl88
2 files changed, 50 insertions, 47 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 67cabcfbfb..1787d688d5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -810,12 +810,13 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
handle_method(#'basic.qos'{prefetch_count = 0}, _,
State = #ch{limiter = Limiter}) ->
- Limiter1 = rabbit_limiter:unlimit(Limiter),
+ Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, _,
State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
- Limiter1 = rabbit_limiter:limit(Limiter, PrefetchCount, queue:len(UAMQ)),
+ Limiter1 = rabbit_limiter:limit_prefetch(Limiter,
+ PrefetchCount, queue:len(UAMQ)),
{reply, #'basic.qos_ok'{},
maybe_limit_queues(Limiter, Limiter1, State#ch{limiter = Limiter1})};
@@ -1363,7 +1364,7 @@ consumer_queues(Consumers) ->
notify_limiter(Limiter, Acked) ->
%% optimisation: avoid the potentially expensive 'foldl' in the
%% common case.
- case rabbit_limiter:is_limited(Limiter) of
+ case rabbit_limiter:is_prefetch_limited(Limiter) of
false -> ok;
true -> case lists:foldl(fun ({_, none, _}, Acc) -> Acc;
({_, _, _}, Acc) -> Acc + 1
@@ -1528,7 +1529,7 @@ i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(prefetch_count, #ch{limiter = Limiter}) ->
- rabbit_limiter:get_limit(Limiter);
+ rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
rabbit_limiter:is_blocked(Limiter);
i(Item, _) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index b914306bd4..430c2716c4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -57,9 +57,10 @@
%% The interactions with the limiter are as follows:
%%
%% 1. Channels tell the limiter about basic.qos prefetch counts -
-%% that's what the limit/3, unlimit/1, is_limited/1, get_limit/1
-%% API functions are about - and channel.flow blocking - that's
-%% what block/1, unblock/1 and is_blocked/1 are for.
+%% that's what the limit_prefetch/3, unlimit_prefetch/1,
+%% is_prefetch_limited/1, get_prefetch_limit/1 API functions are
+%% about - and channel.flow blocking - that's what block/1,
+%% unblock/1 and is_blocked/1 are for.
%%
%% 2. Queues register with the limiter - this happens as part of
%% activate/1.
@@ -106,8 +107,9 @@
-export([start_link/0]).
%% channel API
--export([new/1, limit/3, unlimit/1, block/1, unblock/1,
- is_limited/1, is_blocked/1, is_active/1, get_limit/1, ack/2, pid/1]).
+-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
+ is_prefetch_limited/1, is_blocked/1, is_active/1,
+ get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/2, resume/1, deactivate/1,
is_suspended/1]).
@@ -117,31 +119,31 @@
%%----------------------------------------------------------------------------
--record(lstate, {pid, limited, blocked}).
+-record(lstate, {pid, prefetch_limited, blocked}).
-record(qstate, {pid, state}).
-ifdef(use_specs).
--type(lstate() :: #lstate{pid :: pid(),
- limited :: boolean(),
- blocked :: boolean()}).
+-type(lstate() :: #lstate{pid :: pid(),
+ prefetch_limited :: boolean(),
+ blocked :: boolean()}).
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
--spec(limit/3 :: (lstate(), non_neg_integer(), non_neg_integer()) ->
- lstate()).
--spec(unlimit/1 :: (lstate()) -> lstate()).
--spec(block/1 :: (lstate()) -> lstate()).
--spec(unblock/1 :: (lstate()) -> lstate()).
--spec(is_limited/1 :: (lstate()) -> boolean()).
--spec(is_blocked/1 :: (lstate()) -> boolean()).
--spec(is_active/1 :: (lstate()) -> boolean()).
--spec(get_limit/1 :: (lstate()) -> non_neg_integer()).
--spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
--spec(pid/1 :: (lstate()) -> pid()).
+-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
+ -> lstate()).
+-spec(unlimit_prefetch/1 :: (lstate()) -> lstate()).
+-spec(block/1 :: (lstate()) -> lstate()).
+-spec(unblock/1 :: (lstate()) -> lstate()).
+-spec(is_prefetch_limited/1 :: (lstate()) -> boolean()).
+-spec(is_blocked/1 :: (lstate()) -> boolean()).
+-spec(is_active/1 :: (lstate()) -> boolean()).
+-spec(get_prefetch_limit/1 :: (lstate()) -> non_neg_integer()).
+-spec(ack/2 :: (lstate(), non_neg_integer()) -> 'ok').
+-spec(pid/1 :: (lstate()) -> pid()).
-spec(client/1 :: (pid()) -> qstate()).
-spec(activate/1 :: (qstate()) -> qstate()).
@@ -173,15 +175,16 @@ start_link() -> gen_server2:start_link(?MODULE, [], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
ok = gen_server:call(Pid, {new, self()}),
- #lstate{pid = Pid, limited = false, blocked = false}.
+ #lstate{pid = Pid, prefetch_limited = false, blocked = false}.
-limit(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
- ok = gen_server:call(L#lstate.pid, {limit, PrefetchCount, UnackedCount}),
- L#lstate{limited = true}.
+limit_prefetch(L, PrefetchCount, UnackedCount) when PrefetchCount > 0 ->
+ ok = gen_server:call(L#lstate.pid,
+ {limit_prefetch, PrefetchCount, UnackedCount}),
+ L#lstate{prefetch_limited = true}.
-unlimit(L) ->
- ok = gen_server:call(L#lstate.pid, unlimit),
- L#lstate{limited = false}.
+unlimit_prefetch(L) ->
+ ok = gen_server:call(L#lstate.pid, unlimit_prefetch),
+ L#lstate{prefetch_limited = false}.
block(L) ->
ok = gen_server:call(L#lstate.pid, block),
@@ -191,16 +194,16 @@ unblock(L) ->
ok = gen_server:call(L#lstate.pid, unblock),
L#lstate{blocked = false}.
-is_limited(#lstate{limited = Limited}) -> Limited.
+is_prefetch_limited(#lstate{prefetch_limited = Limited}) -> Limited.
is_blocked(#lstate{blocked = Blocked}) -> Blocked.
-is_active(L) -> is_limited(L) orelse is_blocked(L).
+is_active(L) -> is_prefetch_limited(L) orelse is_blocked(L).
-get_limit(#lstate{limited = false}) -> 0;
-get_limit(L) -> gen_server:call(L#lstate.pid, get_limit).
+get_prefetch_limit(#lstate{prefetch_limited = false}) -> 0;
+get_prefetch_limit(L) -> gen_server:call(L#lstate.pid, get_prefetch_limit).
-ack(#lstate{limited = false}, _AckCount) -> ok;
+ack(#lstate{prefetch_limited = false}, _AckCount) -> ok;
ack(L, AckCount) -> gen_server:cast(L#lstate.pid, {ack, AckCount}).
pid(#lstate{pid = Pid}) -> Pid.
@@ -212,8 +215,6 @@ activate(L = #qstate{state = dormant}) ->
L#qstate{state = active};
activate(L) -> L.
-%% Ask the limiter whether the queue can deliver a message without
-%% breaching a limit.
can_send(L = #qstate{state = active}, AckRequired) ->
rabbit_misc:with_exit_handler(
fun () -> {continue, L} end,
@@ -241,20 +242,20 @@ is_suspended(#qstate{}) -> false.
init([]) -> {ok, #lim{}}.
-prioritise_call(get_limit, _From, _State) -> 9;
-prioritise_call(_Msg, _From, _State) -> 0.
+prioritise_call(get_prefetch_limit, _From, _State) -> 9;
+prioritise_call(_Msg, _From, _State) -> 0.
handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) ->
{reply, ok, State#lim{ch_pid = ChPid}};
-handle_call({limit, PrefetchCount, UnackedCount}, _From, State) ->
+handle_call({limit_prefetch, PrefetchCount, UnackedCount}, _From, State) ->
%% assertion
true = State#lim.prefetch_count == 0 orelse
State#lim.volume == UnackedCount,
{reply, ok, maybe_notify(State, State#lim{prefetch_count = PrefetchCount,
volume = UnackedCount})};
-handle_call(unlimit, _From, State) ->
+handle_call(unlimit_prefetch, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{prefetch_count = 0,
volume = 0})};
@@ -264,7 +265,8 @@ handle_call(block, _From, State) ->
handle_call(unblock, _From, State) ->
{reply, ok, maybe_notify(State, State#lim{blocked = false})};
-handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+handle_call(get_prefetch_limit, _From,
+ State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
handle_call({can_send, QPid, _AckRequired}, _From,
@@ -272,7 +274,7 @@ handle_call({can_send, QPid, _AckRequired}, _From,
{reply, false, limit_queue(QPid, State)};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
- case limit_reached(State) of
+ case prefetch_limit_reached(State) of
true -> {reply, false, limit_queue(QPid, State)};
false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
true -> Volume
@@ -305,13 +307,13 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case (limit_reached(OldState) orelse blocked(OldState)) andalso
- not (limit_reached(NewState) orelse blocked(NewState)) of
+ case (prefetch_limit_reached(OldState) orelse blocked(OldState)) andalso
+ not (prefetch_limit_reached(NewState) orelse blocked(NewState)) of
true -> notify_queues(NewState);
false -> NewState
end.
-limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
+prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
blocked(#lim{blocked = Blocked}) -> Blocked.