diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-11-21 17:34:01 +0000 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-11-21 17:34:01 +0000 |
| commit | abbadaa58c9a9025a7881d482017bfb429c79d92 (patch) | |
| tree | 9ff2fcb4ab1940a3e892edadae0e0f45bf7f81a0 /src | |
| parent | eaac22a45dc5fa2c64ed54c80e3fc6e5eaade901 (diff) | |
| download | rabbitmq-server-git-abbadaa58c9a9025a7881d482017bfb429c79d92.tar.gz | |
Got rid o the per-queue in-use capacity
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 51 |
2 files changed, 22 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6ef5f97071..c0f48ad1f0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -666,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> not_found -> noreply(State); C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> - rabbit_limiter:decrement_capacity(LimiterPid, self()), + rabbit_limiter:decrement_capacity(LimiterPid), {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index fbce5ea4bf..3f194b318f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -10,11 +10,12 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1]). --export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). +-export([set_prefetch_count/2, can_send/2, decrement_capacity/1]). -record(lim, {prefetch_count = 0, ch_pid, - in_use = dict:new()}). + queues = sets:new(), + in_use = 0}). %--------------------------------------------------------------------------- % API @@ -35,8 +36,8 @@ can_send(LimiterPid, QPid) -> % Lets the limiter know that a queue has received an ack from a consumer % and hence can reduce the in-use-by-that queue capcity information -decrement_capacity(LimiterPid, QPid) -> - gen_server:cast(LimiterPid, {decrement_capacity, QPid}). +decrement_capacity(LimiterPid) -> + gen_server:cast(LimiterPid, decrement_capacity). %--------------------------------------------------------------------------- % gen_server callbacks @@ -47,10 +48,13 @@ init([ChPid]) -> % This queuries the limiter to ask if it is possible to send a message without % breaching a limit for this queue process -handle_call({can_send, QPid}, _From, State) -> +handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse, + queues = Queues}) -> case limit_reached(State) of true -> {reply, false, State}; - false -> {reply, true, update_in_use_capacity(QPid, State)} + false -> + NewQueues = sets:add_element(QPid, Queues), + {reply, true, State#lim{in_use = InUse + 1, queues = NewQueues}} end; % When the new limit is larger than the existing limit, @@ -69,8 +73,8 @@ handle_call({prefetch_count, PrefetchCount}, _From, State) -> % This is an asynchronous ack from a queue that it has received an ack from % a queue. This allows the limiter to update the the in-use-by-that queue % capacity infromation. -handle_cast({decrement_capacity, QPid}, State) -> - NewState = decrement_in_use(QPid, State), +handle_cast(decrement_capacity, State) -> + NewState = decrement_in_use(State), ShouldNotify = limit_reached(State) and not(limit_reached(NewState)), if ShouldNotify -> notify_queues(State); @@ -92,34 +96,21 @@ code_change(_, State, _) -> %--------------------------------------------------------------------------- % Reduces the in-use-count of the queue by one -decrement_in_use(QPid, State = #lim{in_use = InUse}) -> - NewInUse = dict:update_counter(QPid, -1, InUse), - Count = dict:fetch(QPid, NewInUse), - if - Count < 1 -> - State#lim{in_use = dict:erase(QPid, NewInUse)}; - true -> - State#lim{in_use = NewInUse} - end. +decrement_in_use(State = #lim{in_use = 0}) -> + State#lim{in_use = 0}; -% Unblocks every queue that this limiter knows about -notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) -> - dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse). +decrement_in_use(State = #lim{in_use = InUse}) -> + State#lim{in_use = InUse - 1}. -% Computes the current aggregrate capacity of all of the in-use queues -current_capacity(#lim{in_use = InUse}) -> - % TODO It *seems* expensive to compute this on the fly - dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse). +% Unblocks every queue that this limiter knows about +notify_queues(#lim{ch_pid = ChPid, queues = Queues}) -> + sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues). % A prefetch limit of zero means unlimited limit_reached(#lim{prefetch_count = 0}) -> false; % Works out whether the limit is breached for the current limiter state -limit_reached(State = #lim{prefetch_count = Limit}) -> - current_capacity(State) == Limit. - -% Increments the counter for the in-use-capacity of a particular queue -update_in_use_capacity(QPid, State = #lim{in_use = InUse}) -> - State#lim{in_use = dict:update_counter(QPid, 1, InUse)}. +limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) -> + InUse == Limit. |
