diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 51 |
2 files changed, 35 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b1982d30a5..945cd8bd5d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -92,7 +92,7 @@ transactions, memory ]). - + %%---------------------------------------------------------------------------- start_link(Q) -> @@ -121,8 +121,10 @@ init(Q = #amqqueue { name = QName }) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, #q{variable_queue_state = VQS}) -> + ok = rabbit_memory_monitor:deregister(self()), _VQS = rabbit_variable_queue:terminate(VQS); terminate(_Reason, State = #q{variable_queue_state = VQS}) -> + ok = rabbit_memory_monitor:deregister(self()), %% FIXME: How do we cancel active subscriptions? %% Ensure that any persisted tx messages are removed. %% TODO: wait for all in flight tx_commits to complete diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 7237b82559..3f2c02f4a4 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -39,7 +39,8 @@ -behaviour(gen_server2). --export([start_link/0, update/0, register/2, report_queue_duration/2]). +-export([start_link/0, update/0, register/2, deregister/1, + report_queue_duration/2, stop/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -49,7 +50,6 @@ queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum memory_limit, %% how much memory we intend to use - memory_ratio, %% limit / used desired_duration, %% the desired queue duration callbacks %% a dict of qpid -> {M,F,A}s }). @@ -69,7 +69,9 @@ -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). -spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). +-spec(deregister/1 :: (pid()) -> 'ok'). -spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()). +-spec(stop/0 :: () -> 'ok'). -endif. @@ -86,10 +88,15 @@ update() -> register(Pid, MFA = {_M, _F, _A}) -> gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). +deregister(Pid) -> + gen_server2:cast(?SERVER, {deregister, Pid}). + report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, {report_queue_duration, Pid, QueueDuration}, infinity). +stop() -> + gen_server2:cast(?SERVER, stop). %%---------------------------------------------------------------------------- %% Gen_server callbacks @@ -109,7 +116,6 @@ init([]) -> queue_duration_sum = 0.0, queue_duration_count = 0, memory_limit = MemoryLimit, - memory_ratio = 1.0, desired_duration = infinity, callbacks = dict:new()})}. @@ -156,23 +162,17 @@ handle_call(_Request, _From, State) -> handle_cast(update, State) -> {noreply, internal_update(State)}; +handle_cast({deregister, Pid}, State) -> + {noreply, internal_deregister(Pid, State)}; + +handle_cast(stop, State) -> + {stop, normal, State}; + handle_cast(_Request, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state{queue_duration_sum = Sum, - queue_duration_count = Count, - queue_durations = Durations, - callbacks = Callbacks}) -> - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), - Sum1 = case PrevQueueDuration of - infinity -> Sum; - _ -> Sum - PrevQueueDuration - end, - true = ets:delete(State#state.queue_durations, Pid), - {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_count = Count-1, - callbacks = dict:erase(Pid, Callbacks)}}; +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + {noreply, internal_deregister(Pid, State)}; handle_info(_Info, State) -> {noreply, State}. @@ -189,6 +189,20 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- +internal_deregister(Pid, State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + callbacks = Callbacks}) -> + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + Sum1 = case PrevQueueDuration of + infinity -> Sum; + _ -> Sum - PrevQueueDuration + end, + true = ets:delete(State#state.queue_durations, Pid), + State#state{queue_duration_sum = Sum1, + queue_duration_count = Count-1, + callbacks = dict:erase(Pid, Callbacks)}. + internal_update(State = #state{memory_limit = Limit, queue_durations = Durations, desired_duration = DesiredDurationAvg, @@ -213,8 +227,7 @@ internal_update(State = #state{memory_limit = Limit, true -> infinity; false -> lists:max([0, AvgDuration * MemoryRatio]) end, - State1 = State#state{memory_ratio = MemoryRatio, - desired_duration = DesiredDurationAvg1}, + State1 = State#state{desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased |
