summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_channel.erl1
-rw-r--r--src/rabbit_limiter.erl11
3 files changed, 16 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b4d0d52d53..2000a11cbd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -81,6 +81,9 @@ init(Q) ->
round_robin = queue:new()}, ?HIBERNATE_AFTER}.
terminate(_Reason, State) ->
+ %% Inform all limiters that we're dying
+ [ rabbit_limiter:unregister_queue(LimiterPid, self())
+ || #cr{limiter_pid = LimiterPid} <- all_ch_record()],
%% FIXME: How do we cancel active subscriptions?
QName = qname(State),
lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end,
@@ -665,7 +668,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) ->
case lookup_ch(ChPid) of
not_found ->
noreply(State);
- C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} ->
+ C = #cr{unacked_messages = UAM} ->
{Acked, Remaining} = collect_messages(MsgIds, UAM),
persist_acks(Txn, qname(State), Acked),
case Txn of
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f9f929598f..240ee3d3b2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -293,6 +293,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
_ -> true
end
end, Acked),
+ % TODO Optimization: Probably don't need to send this if len = 0
rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)),
Participants = ack(State#ch.proxy_pid, TxnKey, Acked),
{noreply, case TxnKey of
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 4e130ea0c2..adc2c721cf 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -11,6 +11,7 @@
handle_info/2]).
-export([start_link/1]).
-export([set_prefetch_count/2, can_send/2, decrement_capacity/2]).
+-export([unregister_queue/2]).
-record(lim, {prefetch_count = 0,
ch_pid,
@@ -38,6 +39,11 @@ can_send(LimiterPid, QPid) ->
% and hence can reduce the in-use-by-that queue capcity information
decrement_capacity(LimiterPid, Magnitude) ->
gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}).
+
+% This is called to tell the limiter that the queue is probably dead and
+% it should be forgotten about
+unregister_queue(LimiterPid, QPid) ->
+ gen_server:cast(LimiterPid, {unregister_queue, QPid}).
%---------------------------------------------------------------------------
% gen_server callbacks
@@ -68,6 +74,11 @@ handle_cast({prefetch_count, PrefetchCount},
queues = sets:new(),
in_use = 0}};
+% Removes the queue process from the set of monitored queues
+handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) ->
+ NewState = decrement_in_use(1, State),
+ {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}};
+
% Default setter of the prefetch count
handle_cast({prefetch_count, PrefetchCount}, State) ->
{noreply, State#lim{prefetch_count = PrefetchCount}};