diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-20 17:15:52 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-20 17:15:52 +0000 |
| commit | 86db69511aa3026a1f6391f98a7a11da6604cbde (patch) | |
| tree | deb7be90fcb1e35de42d01a4ed0c9952f8834db0 | |
| parent | 657c359e24ec8c300e2fd89a4bfc8201e6b4c3b4 (diff) | |
| download | rabbitmq-server-git-86db69511aa3026a1f6391f98a7a11da6604cbde.tar.gz | |
Refactored the internal structure of the limiter
| -rw-r--r-- | src/rabbit_limiter.erl | 112 |
1 files changed, 34 insertions, 78 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 1973d3588b..adf4cd4beb 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -1,3 +1,5 @@ +%% TODO Decide what to do with the license statement now that Cohesive have +%% bailed. -module(rabbit_limiter). @@ -10,9 +12,8 @@ -export([start_link/1]). -export([can_send/2, decrement_capacity/2]). --record(lim, {prefetch_count = 1, +-record(lim, {prefetch_count = 0, ch_pid, - blocked = false, in_use = dict:new()}). %--------------------------------------------------------------------------- @@ -44,22 +45,22 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process handle_call({can_send, QPid}, _From, State) -> - {CanSend, NewState} = maybe_can_send(QPid, State), - {reply, CanSend, NewState}. + case limit_reached(State) of + true -> {reply, false, State}; + false -> {reply, true, update_in_use_capacity(QPid, State)} + end. % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. handle_cast({decrement_capacity, QPid}, State) -> - State1 = decrement_in_use(QPid, State), - State2 = maybe_notify_queues(State1), - {noreply, State2}. - -% When the prefetch count has not been set, -% e.g. when the channel has not yet been issued a basic.qos -handle_info({prefetch_count, PrefetchCount}, - State = #lim{prefetch_count = 0}) -> - {noreply, State#lim{prefetch_count = PrefetchCount}}; + NewState = decrement_in_use(QPid, State), + ShouldNotify = limit_reached(State) and not(limit_reached(State)), + if + ShouldNotify -> notify_queues(NewState); + true -> ok + end, + {noreply, NewState}. % When the new limit is larger than the existing limit, % notify all queues and forget about queues with an in-use @@ -86,78 +87,33 @@ code_change(_, State, _) -> % Reduces the in-use-count of the queue by one decrement_in_use(QPid, State = #lim{in_use = InUse}) -> - case dict:find(QPid, InUse) of - {ok, Capacity} -> - if - % Is there a lower bound on capacity? - % i.e. what is the zero mark, how much is unlimited? - Capacity > 0 -> - NewInUse = dict:store(QPid, Capacity - 1, InUse), - State#lim{in_use = NewInUse}; - true -> - % TODO How should this be handled? - rabbit_log:warning( - "Ignoring decrement for zero capacity: ~p~n", - [QPid]), - State - end; - error -> - % TODO How should this case be handled? - rabbit_log:warning("Ignoring decrement for unknown queue: ~p~n", - [QPid]), - State + NewInUse = dict:update_counter(QPid, -1, InUse), + Count = dict:fetch(QPid, NewInUse), + if + Count < 1 -> + State#lim{in_use = dict:erase(QPid, NewInUse)}; + true -> + State#lim{in_use = NewInUse} end. -% Works out whether any queues should be notified -% If any notification is required, it propagates a transition -% of the blocked state -maybe_notify_queues(State = #lim{ch_pid = ChPid, in_use = InUse}) -> - Capacity = current_capacity(State), - case should_notify(Capacity, State) of - true -> - dict:map(fun(Q,_) -> - rabbit_amqqueue:unblock(Q, ChPid) - end, InUse), - State#lim{blocked = false}; - false -> - State - end. +% Unblocks every queue that this limiter knows about +notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) -> + dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse). % Computes the current aggregrate capacity of all of the in-use queues current_capacity(#lim{in_use = InUse}) -> - % TODO This *seems* expensive to compute this on the fly + % TODO It *seems* expensive to compute this on the fly dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). +% A prefetch limit of zero means unlimited +limit_reached(#lim{prefetch_count = 0}) -> + false; -% This is a very naive way of deciding wether to unblock or not, -% it *might* be better to wait for a time or volume threshold -% instead of broadcasting notifications -should_notify(Capacity, #lim{prefetch_count = Limit, blocked = true}) - when Capacity < Limit -> - true; +% Works out whether the limit is breached for the current limiter state +limit_reached(State = #lim{prefetch_count = Limit}) -> + current_capacity(State) == Limit. -should_notify(_,_) -> false. - -maybe_can_send(_, State = #lim{blocked = true}) -> - {false, State}; - -maybe_can_send(QPid, State = #lim{prefetch_count = Limit, - in_use = InUse, - blocked = false}) -> - Capacity = current_capacity(State), - if - Capacity < Limit -> - NewInUse = update_in_use_capacity(QPid, InUse), - { true, State#lim{in_use = NewInUse} }; - true -> - { false, State#lim{blocked = true}} - end. - -update_in_use_capacity(QPid, InUse) -> - case dict:find(QPid, InUse) of - {ok, Capacity} -> - dict:store(QPid, Capacity + 1, InUse); - error -> - dict:store(QPid, 0, InUse) - end. +% Increments the counter for the in-use-capacity of a particular queue +update_in_use_capacity(QPid, State = #lim{in_use = InUse}) -> + State#lim{in_use = dict:update_counter(QPid, 1, InUse)}. |
