summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-26 11:52:32 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-26 11:52:32 +0000
commit6e122e3710f2fc76f4524d94e9dcb364dc69b018 (patch)
tree711041aee73f7b46a9893d9099728b6a09c8051f
parentf4ff01381ee9813040cc99e8226241c7d77c6ab3 (diff)
downloadrabbitmq-server-git-6e122e3710f2fc76f4524d94e9dcb364dc69b018.tar.gz
Lots of improvements: o) merge the dict and ets; use a record there; o) drop record memory_ratio; o) corrected where we clamp to non-neg numbers; o) correct conditional in internal update
-rw-r--r--src/rabbit_memory_monitor.erl83
1 files changed, 44 insertions, 39 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 3f2c02f4a4..6359ecc99d 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -45,13 +45,14 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-record(process, {pid, reported, sent, callback}).
+
-record(state, {timer, %% 'internal_update' timer
- queue_durations, %% ets, (qpid, last_reported, last_sent)
+ queue_durations, %% ets #process
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
memory_limit, %% how much memory we intend to use
- desired_duration, %% the desired queue duration
- callbacks %% a dict of qpid -> {M,F,A}s
+ desired_duration %% the desired queue duration
}).
-define(SERVER, ?MODULE).
@@ -110,14 +111,16 @@ init([]) ->
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
+
+ Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
+
{ok, internal_update(
#state{timer = TRef,
- queue_durations = ets:new(?TABLE_NAME, [set, private]),
+ queue_durations = Ets,
queue_duration_sum = 0.0,
queue_duration_count = 0,
memory_limit = MemoryLimit,
- desired_duration = infinity,
- callbacks = dict:new()})}.
+ desired_duration = infinity})}.
handle_call({report_queue_duration, Pid, QueueDuration}, From,
State = #state{queue_duration_sum = Sum,
@@ -130,7 +133,8 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
true -> infinity;
false -> QueueDuration
end,
- [{_Pid, PrevQueueDuration, PrevSendDuration}] = ets:lookup(Durations, Pid),
+ [Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] =
+ ets:lookup(Durations, Pid),
SendDuration1 =
case QueueDuration1 < 1 andalso PrevSendDuration == infinity of
@@ -146,15 +150,17 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
{_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
{_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count}
end,
- true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration1}),
- {noreply, State#state{queue_duration_sum = Sum1,
+ true = ets:insert(Durations, Proc#process{reported = QueueDuration1,
+ sent = SendDuration1}),
+ {noreply, State#state{queue_duration_sum = lists:max([0, Sum1]),
queue_duration_count = Count1}};
-handle_call({register, Pid, MFA}, _From, State =
- #state{queue_durations = Durations, callbacks = Callbacks}) ->
+handle_call({register, Pid, MFA}, _From,
+ State = #state{queue_durations = Durations}) ->
_MRef = erlang:monitor(process, Pid),
- true = ets:insert(Durations, {Pid, infinity, infinity}),
- {reply, ok, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}};
+ true = ets:insert(Durations, #process{pid = Pid, reported = infinity,
+ sent = infinity, callback = MFA}),
+ {reply, ok, State};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -191,24 +197,24 @@ code_change(_OldVsn, State, _Extra) ->
internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
queue_duration_count = Count,
- queue_durations = Durations,
- callbacks = Callbacks}) ->
- [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
- Sum1 = case PrevQueueDuration of
- infinity -> Sum;
- _ -> Sum - PrevQueueDuration
- end,
- true = ets:delete(State#state.queue_durations, Pid),
- State#state{queue_duration_sum = Sum1,
- queue_duration_count = Count-1,
- callbacks = dict:erase(Pid, Callbacks)}.
+ queue_durations = Durations}) ->
+ case ets:lookup(Durations, Pid) of
+ [] -> State;
+ [#process{reported = PrevQueueDuration}] ->
+ Sum1 = case PrevQueueDuration of
+ infinity -> Sum;
+ _ -> lists:max([0, Sum - PrevQueueDuration])
+ end,
+ true = ets:delete(State#state.queue_durations, Pid),
+ State#state{queue_duration_sum = Sum1,
+ queue_duration_count = Count-1}
+ end.
internal_update(State = #state{memory_limit = Limit,
queue_durations = Durations,
desired_duration = DesiredDurationAvg,
queue_duration_sum = Sum,
- queue_duration_count = Count,
- callbacks = Callbacks}) ->
+ queue_duration_count = Count}) ->
%% available memory / used memory
MemoryRatio = Limit / erlang:memory(total),
%% if all queues are pushed to disk, then Sum will be 0. If memory
@@ -225,30 +231,30 @@ internal_update(State = #state{memory_limit = Limit,
DesiredDurationAvg1 =
case AvgDuration == infinity orelse MemoryRatio > 2 of
true -> infinity;
- false -> lists:max([0, AvgDuration * MemoryRatio])
+ false -> AvgDuration * MemoryRatio
end,
State1 = State#state{desired_duration = DesiredDurationAvg1},
%% only inform queues immediately if the desired duration has
%% decreased
- case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity)
- orelse (DesiredDurationAvg1 /= infinity andalso
- DesiredDurationAvg1 < DesiredDurationAvg) of
+ case (DesiredDurationAvg == infinity andalso DesiredDurationAvg1 /= infinity)
+ orelse (DesiredDurationAvg /= infinity andalso
+ DesiredDurationAvg > DesiredDurationAvg1) of
true ->
%% If we have pessimistic information, we need to inform
%% queues to reduce it's memory usage when needed. This
%% sometimes wakes up queues from hibernation.
true = ets:foldl(
- fun ({Pid, QueueDuration, PrevSendDuration}, true) ->
+ fun (Proc = #process{reported = QueueDuration,
+ sent = PrevSendDuration}, true) ->
case DesiredDurationAvg1 <
lists:min([PrevSendDuration, QueueDuration]) of
true ->
- ok =
- set_queue_duration(
- Pid, DesiredDurationAvg1, Callbacks),
- ets:insert(Durations,
- {Pid, QueueDuration,
- DesiredDurationAvg1});
+ ok = set_queue_duration(
+ Proc, DesiredDurationAvg1),
+ ets:insert(
+ Durations,
+ Proc#process{sent=DesiredDurationAvg1});
false -> true
end
end, true, Durations);
@@ -262,6 +268,5 @@ get_memory_limit() ->
A -> A
end.
-set_queue_duration(Pid, QueueDuration, Callbacks) ->
- {M,F,A} = dict:fetch(Pid, Callbacks),
+set_queue_duration(#process{callback={M,F,A}}, QueueDuration) ->
ok = erlang:apply(M, F, A++[QueueDuration]).