summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_memory_monitor.erl51
2 files changed, 35 insertions, 20 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b1982d30a5..945cd8bd5d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -92,7 +92,7 @@
transactions,
memory
]).
-
+
%%----------------------------------------------------------------------------
start_link(Q) ->
@@ -121,8 +121,10 @@ init(Q = #amqqueue { name = QName }) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(shutdown, #q{variable_queue_state = VQS}) ->
+ ok = rabbit_memory_monitor:deregister(self()),
_VQS = rabbit_variable_queue:terminate(VQS);
terminate(_Reason, State = #q{variable_queue_state = VQS}) ->
+ ok = rabbit_memory_monitor:deregister(self()),
%% FIXME: How do we cancel active subscriptions?
%% Ensure that any persisted tx messages are removed.
%% TODO: wait for all in flight tx_commits to complete
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 7237b82559..3f2c02f4a4 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -39,7 +39,8 @@
-behaviour(gen_server2).
--export([start_link/0, update/0, register/2, report_queue_duration/2]).
+-export([start_link/0, update/0, register/2, deregister/1,
+ report_queue_duration/2, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -49,7 +50,6 @@
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
memory_limit, %% how much memory we intend to use
- memory_ratio, %% limit / used
desired_duration, %% the desired queue duration
callbacks %% a dict of qpid -> {M,F,A}s
}).
@@ -69,7 +69,9 @@
-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
+-spec(deregister/1 :: (pid()) -> 'ok').
-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()).
+-spec(stop/0 :: () -> 'ok').
-endif.
@@ -86,10 +88,15 @@ update() ->
register(Pid, MFA = {_M, _F, _A}) ->
gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
+deregister(Pid) ->
+ gen_server2:cast(?SERVER, {deregister, Pid}).
+
report_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
{report_queue_duration, Pid, QueueDuration}, infinity).
+stop() ->
+ gen_server2:cast(?SERVER, stop).
%%----------------------------------------------------------------------------
%% Gen_server callbacks
@@ -109,7 +116,6 @@ init([]) ->
queue_duration_sum = 0.0,
queue_duration_count = 0,
memory_limit = MemoryLimit,
- memory_ratio = 1.0,
desired_duration = infinity,
callbacks = dict:new()})}.
@@ -156,23 +162,17 @@ handle_call(_Request, _From, State) ->
handle_cast(update, State) ->
{noreply, internal_update(State)};
+handle_cast({deregister, Pid}, State) ->
+ {noreply, internal_deregister(Pid, State)};
+
+handle_cast(stop, State) ->
+ {stop, normal, State};
+
handle_cast(_Request, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #state{queue_duration_sum = Sum,
- queue_duration_count = Count,
- queue_durations = Durations,
- callbacks = Callbacks}) ->
- [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
- Sum1 = case PrevQueueDuration of
- infinity -> Sum;
- _ -> Sum - PrevQueueDuration
- end,
- true = ets:delete(State#state.queue_durations, Pid),
- {noreply, State#state{queue_duration_sum = Sum1,
- queue_duration_count = Count-1,
- callbacks = dict:erase(Pid, Callbacks)}};
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
+ {noreply, internal_deregister(Pid, State)};
handle_info(_Info, State) ->
{noreply, State}.
@@ -189,6 +189,20 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%----------------------------------------------------------------------------
+internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ callbacks = Callbacks}) ->
+ [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
+ Sum1 = case PrevQueueDuration of
+ infinity -> Sum;
+ _ -> Sum - PrevQueueDuration
+ end,
+ true = ets:delete(State#state.queue_durations, Pid),
+ State#state{queue_duration_sum = Sum1,
+ queue_duration_count = Count-1,
+ callbacks = dict:erase(Pid, Callbacks)}.
+
internal_update(State = #state{memory_limit = Limit,
queue_durations = Durations,
desired_duration = DesiredDurationAvg,
@@ -213,8 +227,7 @@ internal_update(State = #state{memory_limit = Limit,
true -> infinity;
false -> lists:max([0, AvgDuration * MemoryRatio])
end,
- State1 = State#state{memory_ratio = MemoryRatio,
- desired_duration = DesiredDurationAvg1},
+ State1 = State#state{desired_duration = DesiredDurationAvg1},
%% only inform queues immediately if the desired duration has
%% decreased