summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-11-20 17:15:52 +0000
committerBen Hood <0x6e6562@gmail.com>2008-11-20 17:15:52 +0000
commit86db69511aa3026a1f6391f98a7a11da6604cbde (patch)
treedeb7be90fcb1e35de42d01a4ed0c9952f8834db0
parent657c359e24ec8c300e2fd89a4bfc8201e6b4c3b4 (diff)
downloadrabbitmq-server-git-86db69511aa3026a1f6391f98a7a11da6604cbde.tar.gz
Refactored the internal structure of the limiter
-rw-r--r--src/rabbit_limiter.erl112
1 files changed, 34 insertions, 78 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 1973d3588b..adf4cd4beb 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -1,3 +1,5 @@
+%% TODO Decide what to do with the license statement now that Cohesive have
+%% bailed.
-module(rabbit_limiter).
@@ -10,9 +12,8 @@
-export([start_link/1]).
-export([can_send/2, decrement_capacity/2]).
--record(lim, {prefetch_count = 1,
+-record(lim, {prefetch_count = 0,
ch_pid,
- blocked = false,
in_use = dict:new()}).
%---------------------------------------------------------------------------
@@ -44,22 +45,22 @@ init([ChPid]) ->
% This queuries the limiter to ask if it is possible to send a message without
% breaching a limit for this queue process
handle_call({can_send, QPid}, _From, State) ->
- {CanSend, NewState} = maybe_can_send(QPid, State),
- {reply, CanSend, NewState}.
+ case limit_reached(State) of
+ true -> {reply, false, State};
+ false -> {reply, true, update_in_use_capacity(QPid, State)}
+ end.
% This is an asynchronous ack from a queue that it has received an ack from
% a queue. This allows the limiter to update the the in-use-by-that queue
% capacity infromation.
handle_cast({decrement_capacity, QPid}, State) ->
- State1 = decrement_in_use(QPid, State),
- State2 = maybe_notify_queues(State1),
- {noreply, State2}.
-
-% When the prefetch count has not been set,
-% e.g. when the channel has not yet been issued a basic.qos
-handle_info({prefetch_count, PrefetchCount},
- State = #lim{prefetch_count = 0}) ->
- {noreply, State#lim{prefetch_count = PrefetchCount}};
+ NewState = decrement_in_use(QPid, State),
+ ShouldNotify = limit_reached(State) and not(limit_reached(State)),
+ if
+ ShouldNotify -> notify_queues(NewState);
+ true -> ok
+ end,
+ {noreply, NewState}.
% When the new limit is larger than the existing limit,
% notify all queues and forget about queues with an in-use
@@ -86,78 +87,33 @@ code_change(_, State, _) ->
% Reduces the in-use-count of the queue by one
decrement_in_use(QPid, State = #lim{in_use = InUse}) ->
- case dict:find(QPid, InUse) of
- {ok, Capacity} ->
- if
- % Is there a lower bound on capacity?
- % i.e. what is the zero mark, how much is unlimited?
- Capacity > 0 ->
- NewInUse = dict:store(QPid, Capacity - 1, InUse),
- State#lim{in_use = NewInUse};
- true ->
- % TODO How should this be handled?
- rabbit_log:warning(
- "Ignoring decrement for zero capacity: ~p~n",
- [QPid]),
- State
- end;
- error ->
- % TODO How should this case be handled?
- rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n",
- [QPid]),
- State
+ NewInUse = dict:update_counter(QPid, -1, InUse),
+ Count = dict:fetch(QPid, NewInUse),
+ if
+ Count < 1 ->
+ State#lim{in_use = dict:erase(QPid, NewInUse)};
+ true ->
+ State#lim{in_use = NewInUse}
end.
-% Works out whether any queues should be notified
-% If any notification is required, it propagates a transition
-% of the blocked state
-maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) ->
- Capacity = current_capacity(State),
- case should_notify(Capacity, State) of
- true ->
- dict:map(fun(Q,_) ->
- rabbit_amqqueue:unblock(Q, ChPid)
- end, InUse),
- State#lim{blocked = false};
- false ->
- State
- end.
+% Unblocks every queue that this limiter knows about
+notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) ->
+ dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse).
% Computes the current aggregrate capacity of all of the in-use queues
current_capacity(#lim{in_use = InUse}) ->
- % TODO This *seems* expensive to compute this on the fly
+ % TODO It *seems* expensive to compute this on the fly
dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse).
+% A prefetch limit of zero means unlimited
+limit_reached(#lim{prefetch_count = 0}) ->
+ false;
-% This is a very naive way of deciding wether to unblock or not,
-% it *might* be better to wait for a time or volume threshold
-% instead of broadcasting notifications
-should_notify(Capacity, #lim{prefetch_count = Limit, blocked = true})
- when Capacity < Limit ->
- true;
+% Works out whether the limit is breached for the current limiter state
+limit_reached(State = #lim{prefetch_count = Limit}) ->
+ current_capacity(State) == Limit.
-should_notify(_,_) -> false.
-
-maybe_can_send(_, State = #lim{blocked = true}) ->
- {false, State};
-
-maybe_can_send(QPid, State = #lim{prefetch_count = Limit,
- in_use = InUse,
- blocked = false}) ->
- Capacity = current_capacity(State),
- if
- Capacity < Limit ->
- NewInUse = update_in_use_capacity(QPid, InUse),
- { true, State#lim{in_use = NewInUse} };
- true ->
- { false, State#lim{blocked = true}}
- end.
-
-update_in_use_capacity(QPid, InUse) ->
- case dict:find(QPid, InUse) of
- {ok, Capacity} ->
- dict:store(QPid, Capacity + 1, InUse);
- error ->
- dict:store(QPid, 0, InUse)
- end.
+% Increments the counter for the in-use-capacity of a particular queue
+update_in_use_capacity(QPid, State = #lim{in_use = InUse}) ->
+ State#lim{in_use = dict:update_counter(QPid, 1, InUse)}.