diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 83 |
1 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 3f2c02f4a4..6359ecc99d 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -45,13 +45,14 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-record(process, {pid, reported, sent, callback}). + -record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, last_reported, last_sent) + queue_durations, %% ets #process queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum memory_limit, %% how much memory we intend to use - desired_duration, %% the desired queue duration - callbacks %% a dict of qpid -> {M,F,A}s + desired_duration %% the desired queue duration }). -define(SERVER, ?MODULE). @@ -110,14 +111,16 @@ init([]) -> {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), + + Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), + {ok, internal_update( #state{timer = TRef, - queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_durations = Ets, queue_duration_sum = 0.0, queue_duration_count = 0, memory_limit = MemoryLimit, - desired_duration = infinity, - callbacks = dict:new()})}. + desired_duration = infinity})}. handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, @@ -130,7 +133,8 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, true -> infinity; false -> QueueDuration end, - [{_Pid, PrevQueueDuration, PrevSendDuration}] = ets:lookup(Durations, Pid), + [Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] = + ets:lookup(Durations, Pid), SendDuration1 = case QueueDuration1 < 1 andalso PrevSendDuration == infinity of @@ -146,15 +150,17 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count} end, - true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration1}), - {noreply, State#state{queue_duration_sum = Sum1, + true = ets:insert(Durations, Proc#process{reported = QueueDuration1, + sent = SendDuration1}), + {noreply, State#state{queue_duration_sum = lists:max([0, Sum1]), queue_duration_count = Count1}}; -handle_call({register, Pid, MFA}, _From, State = - #state{queue_durations = Durations, callbacks = Callbacks}) -> +handle_call({register, Pid, MFA}, _From, + State = #state{queue_durations = Durations}) -> _MRef = erlang:monitor(process, Pid), - true = ets:insert(Durations, {Pid, infinity, infinity}), - {reply, ok, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; + true = ets:insert(Durations, #process{pid = Pid, reported = infinity, + sent = infinity, callback = MFA}), + {reply, ok, State}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -191,24 +197,24 @@ code_change(_OldVsn, State, _Extra) -> internal_deregister(Pid, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations, - callbacks = Callbacks}) -> - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), - Sum1 = case PrevQueueDuration of - infinity -> Sum; - _ -> Sum - PrevQueueDuration - end, - true = ets:delete(State#state.queue_durations, Pid), - State#state{queue_duration_sum = Sum1, - queue_duration_count = Count-1, - callbacks = dict:erase(Pid, Callbacks)}. + queue_durations = Durations}) -> + case ets:lookup(Durations, Pid) of + [] -> State; + [#process{reported = PrevQueueDuration}] -> + Sum1 = case PrevQueueDuration of + infinity -> Sum; + _ -> lists:max([0, Sum - PrevQueueDuration]) + end, + true = ets:delete(State#state.queue_durations, Pid), + State#state{queue_duration_sum = Sum1, + queue_duration_count = Count-1} + end. internal_update(State = #state{memory_limit = Limit, queue_durations = Durations, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, - queue_duration_count = Count, - callbacks = Callbacks}) -> + queue_duration_count = Count}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), %% if all queues are pushed to disk, then Sum will be 0. If memory @@ -225,30 +231,30 @@ internal_update(State = #state{memory_limit = Limit, DesiredDurationAvg1 = case AvgDuration == infinity orelse MemoryRatio > 2 of true -> infinity; - false -> lists:max([0, AvgDuration * MemoryRatio]) + false -> AvgDuration * MemoryRatio end, State1 = State#state{desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased - case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity) - orelse (DesiredDurationAvg1 /= infinity andalso - DesiredDurationAvg1 < DesiredDurationAvg) of + case (DesiredDurationAvg == infinity andalso DesiredDurationAvg1 /= infinity) + orelse (DesiredDurationAvg /= infinity andalso + DesiredDurationAvg > DesiredDurationAvg1) of true -> %% If we have pessimistic information, we need to inform %% queues to reduce it's memory usage when needed. This %% sometimes wakes up queues from hibernation. true = ets:foldl( - fun ({Pid, QueueDuration, PrevSendDuration}, true) -> + fun (Proc = #process{reported = QueueDuration, + sent = PrevSendDuration}, true) -> case DesiredDurationAvg1 < lists:min([PrevSendDuration, QueueDuration]) of true -> - ok = - set_queue_duration( - Pid, DesiredDurationAvg1, Callbacks), - ets:insert(Durations, - {Pid, QueueDuration, - DesiredDurationAvg1}); + ok = set_queue_duration( + Proc, DesiredDurationAvg1), + ets:insert( + Durations, + Proc#process{sent=DesiredDurationAvg1}); false -> true end end, true, Durations); @@ -262,6 +268,5 @@ get_memory_limit() -> A -> A end. -set_queue_duration(Pid, QueueDuration, Callbacks) -> - {M,F,A} = dict:fetch(Pid, Callbacks), +set_queue_duration(#process{callback={M,F,A}}, QueueDuration) -> ok = erlang:apply(M, F, A++[QueueDuration]). |
