summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_limiter.erl51
2 files changed, 22 insertions, 31 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6ef5f97071..c0f48ad1f0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -666,7 +666,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
not_found ->
noreply(State);
C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
- rabbit_limiter:decrement_capacity(LimiterPid, self()),
+ rabbit_limiter:decrement_capacity(LimiterPid),
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
case Txn of
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index fbce5ea4bf..3f194b318f 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -10,11 +10,12 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1]).
--export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
+-export([set_prefetch_count/2, can_send/2, decrement_capacity/1]).
-record(lim, {prefetch_count = 0,
ch_pid,
- in_use = dict:new()}).
+ queues = sets:new(),
+ in_use = 0}).
%---------------------------------------------------------------------------
% API
@@ -35,8 +36,8 @@ can_send(LimiterPid, QPid) ->
% Lets the limiter know that a queue has received an ack from a consumer
% and hence can reduce the in-use-by-that queue capcity information
-decrement_capacity(LimiterPid, QPid) ->
- gen_server:cast(LimiterPid, {decrement_capacity, QPid}).
+decrement_capacity(LimiterPid) ->
+ gen_server:cast(LimiterPid, decrement_capacity).
%---------------------------------------------------------------------------
% gen_server callbacks
@@ -47,10 +48,13 @@ init([ChPid]) ->
% This queuries the limiter to ask if it is possible to send a message without
% breaching a limit for this queue process
-handle_call({can_send, QPid}, _From, State) ->
+handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse,
+ queues = Queues}) ->
case limit_reached(State) of
true -> {reply, false, State};
- false -> {reply, true, update_in_use_capacity(QPid, State)}
+ false ->
+ NewQueues = sets:add_element(QPid, Queues),
+ {reply, true, State#lim{in_use = InUse + 1, queues = NewQueues}}
end;
% When the new limit is larger than the existing limit,
@@ -69,8 +73,8 @@ handle_call({prefetch_count, PrefetchCount}, _From, State) ->
% This is an asynchronous ack from a queue that it has received an ack from
% a queue. This allows the limiter to update the the in-use-by-that queue
% capacity infromation.
-handle_cast({decrement_capacity, QPid}, State) ->
- NewState = decrement_in_use(QPid, State),
+handle_cast(decrement_capacity, State) ->
+ NewState = decrement_in_use(State),
ShouldNotify = limit_reached(State) and not(limit_reached(NewState)),
if
ShouldNotify -> notify_queues(State);
@@ -92,34 +96,21 @@ code_change(_, State, _) ->
%---------------------------------------------------------------------------
% Reduces the in-use-count of the queue by one
-decrement_in_use(QPid, State = #lim{in_use = InUse}) ->
- NewInUse = dict:update_counter(QPid, -1, InUse),
- Count = dict:fetch(QPid, NewInUse),
- if
- Count < 1 ->
- State#lim{in_use = dict:erase(QPid, NewInUse)};
- true ->
- State#lim{in_use = NewInUse}
- end.
+decrement_in_use(State = #lim{in_use = 0}) ->
+ State#lim{in_use = 0};
-% Unblocks every queue that this limiter knows about
-notify_queues(#lim{ch_pid = ChPid, in_use = InUse}) ->
- dict:map(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, InUse).
+decrement_in_use(State = #lim{in_use = InUse}) ->
+ State#lim{in_use = InUse - 1}.
-% Computes the current aggregrate capacity of all of the in-use queues
-current_capacity(#lim{in_use = InUse}) ->
- % TODO It *seems* expensive to compute this on the fly
- dict:fold(fun(_, PerQ, Acc) -> PerQ + Acc end, 0, InUse).
+% Unblocks every queue that this limiter knows about
+notify_queues(#lim{ch_pid = ChPid, queues = Queues}) ->
+ sets:fold(fun(Q,_) -> rabbit_amqqueue:unblock(Q, ChPid) end, [], Queues).
% A prefetch limit of zero means unlimited
limit_reached(#lim{prefetch_count = 0}) ->
false;
% Works out whether the limit is breached for the current limiter state
-limit_reached(State = #lim{prefetch_count = Limit}) ->
- current_capacity(State) == Limit.
-
-% Increments the counter for the in-use-capacity of a particular queue
-update_in_use_capacity(QPid, State = #lim{in_use = InUse}) ->
- State#lim{in_use = dict:update_counter(QPid, 1, InUse)}.
+limit_reached(#lim{prefetch_count = Limit, in_use = InUse}) ->
+ InUse == Limit.