diff options
| -rw-r--r-- | src/rabbit_limiter.erl | 23 |
1 files changed, 6 insertions, 17 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 0d938580f9..9f23724e52 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -95,20 +95,13 @@ handle_call({can_send, QPid}, _From, State = #lim{in_use = InUse}) -> end. % When the new limit is larger than the existing limit, -% notify all queues and forget about queues with an in-use -% capcity of zero +% notify all queues and forget about all queues handle_cast({prefetch_count, PrefetchCount}, State = #lim{prefetch_count = CurrentLimit}) when PrefetchCount > CurrentLimit -> notify_queues(State), NewState = demonitor_all(State), - {noreply, NewState#lim{prefetch_count = PrefetchCount, - in_use = 0}}; - -% Removes the queue process from the set of monitored queues -handle_cast({unregister_queue, QPid}, State = #lim{}) -> - NewState = decrement_in_use(1, State), - {noreply, demonitor_queue(QPid, NewState)}; + {noreply, NewState#lim{prefetch_count = PrefetchCount, in_use = 0}}; % Default setter of the prefetch count handle_cast({prefetch_count, PrefetchCount}, State) -> @@ -129,8 +122,10 @@ handle_cast({decrement_capacity, Magnitude}, State = #lim{in_use = InUse}) -> {noreply, NewState} end. -handle_info(_, State) -> - {noreply, State}. +%% This is received when a queue dies +handle_info({'DOWN', _MonitorRef, _Type, QPid, _Info}, + State = #lim{queues = Queues}) -> + {noreply, State#lim{queues = dict:erase(QPid, Queues)}}. terminate(_, _) -> ok. @@ -150,12 +145,6 @@ monitor_queue(QPid, State = #lim{queues = Queues}) -> true -> State end. -% Stops monitoring a particular queue -demonitor_queue(QPid, State = #lim{queues = Queues}) -> - MonitorRef = dict:fetch(QPid, Queues), - true = erlang:demonitor(MonitorRef), - State#lim{queues = dict:erase(QPid, Queues)}. - % Stops monitoring all queues demonitor_all(State = #lim{queues = Queues}) -> dict:map(fun(_, Ref) -> true = erlang:demonitor(Ref) end, Queues), |
