diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 11 |
3 files changed, 16 insertions, 1 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b4d0d52d53..2000a11cbd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -81,6 +81,9 @@ init(Q) -> round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> + %% Inform all limiters that we're dying + [ rabbit_limiter:unregister_queue(LimiterPid, self()) + || #cr{limiter_pid = LimiterPid} <- all_ch_record()], %% FIXME: How do we cancel active subscriptions? QName = qname(State), lists:foreach(fun (Txn) -> ok = rollback_work(Txn, QName) end, @@ -665,7 +668,7 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); - C = #cr{unacked_messages = UAM, limiter_pid = LimiterPid} -> + C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), case Txn of diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f9f929598f..240ee3d3b2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -293,6 +293,7 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, _ -> true end end, Acked), + % TODO Optimization: Probably don't need to send this if len = 0 rabbit_limiter:decrement_capacity(Limiter, queue:len(NotBasicGet)), Participants = ack(State#ch.proxy_pid, TxnKey, Acked), {noreply, case TxnKey of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 4e130ea0c2..adc2c721cf 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -11,6 +11,7 @@ handle_info/2]). -export([start_link/1]). -export([set_prefetch_count/2, can_send/2, decrement_capacity/2]). +-export([unregister_queue/2]). -record(lim, {prefetch_count = 0, ch_pid, @@ -38,6 +39,11 @@ can_send(LimiterPid, QPid) -> % and hence can reduce the in-use-by-that queue capcity information decrement_capacity(LimiterPid, Magnitude) -> gen_server:cast(LimiterPid, {decrement_capacity, Magnitude}). + +% This is called to tell the limiter that the queue is probably dead and +% it should be forgotten about +unregister_queue(LimiterPid, QPid) -> + gen_server:cast(LimiterPid, {unregister_queue, QPid}). %--------------------------------------------------------------------------- % gen_server callbacks @@ -68,6 +74,11 @@ handle_cast({prefetch_count, PrefetchCount}, queues = sets:new(), in_use = 0}}; +% Removes the queue process from the set of monitored queues +handle_cast({unregister_queue, QPid}, State= #lim{queues = Queues}) -> + NewState = decrement_in_use(1, State), + {noreply, NewState#lim{queues = sets:del_element(QPid, Queues)}}; + % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> {noreply, State#lim{prefetch_count = PrefetchCount}}; |
