summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-26 17:19:42 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-26 17:19:42 +0000
commitd1a508effe82628fc0e088afbeb8783b56bd5f4e (patch)
tree759977f1619193dffe7cf05a648e456b0323a5e8
parent961f6078f099e462980001ac1b88ea17ae42ef29 (diff)
downloadrabbitmq-server-git-d1a508effe82628fc0e088afbeb8783b56bd5f4e.tar.gz
Added demonitoring, corrected counting on deregistering, and got fed up with the lack of whitespace
-rw-r--r--src/rabbit_memory_monitor.erl92
1 files changed, 49 insertions, 43 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 5f39f7a96f..1b2c698297 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -45,7 +45,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--record(process, {pid, reported, sent, callback}).
+-record(process, {pid, reported, sent, callback, monitor}).
-record(state, {timer, %% 'internal_update' timer
queue_durations, %% ets #process
@@ -131,20 +131,21 @@ init([]) ->
Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]),
{ok, internal_update(
- #state{timer = TRef,
- queue_durations = Ets,
- queue_duration_sum = 0.0,
- queue_duration_count = 0,
- memory_limit = MemoryLimit,
- desired_duration = infinity})}.
+ #state { timer = TRef,
+ queue_durations = Ets,
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ desired_duration = infinity })}.
handle_call({report_queue_duration, Pid, QueueDuration}, From,
- State = #state{queue_duration_sum = Sum,
- queue_duration_count = Count,
- queue_durations = Durations,
- desired_duration = SendDuration}) ->
+ State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations,
+ desired_duration = SendDuration }) ->
- [Proc = #process{reported=PrevQueueDuration}] = ets:lookup(Durations, Pid),
+ [Proc = #process { reported = PrevQueueDuration }] =
+ ets:lookup(Durations, Pid),
gen_server2:reply(From, SendDuration),
@@ -155,16 +156,17 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
{_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
{_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count}
end,
- true = ets:insert(Durations, Proc#process{reported = QueueDuration,
- sent = SendDuration}),
- {noreply, State#state{queue_duration_sum = zero_clamp(Sum1),
- queue_duration_count = Count1}};
+ true = ets:insert(Durations, Proc #process { reported = QueueDuration,
+ sent = SendDuration }),
+ {noreply, State #state { queue_duration_sum = zero_clamp(Sum1),
+ queue_duration_count = Count1 }};
handle_call({register, Pid, MFA}, _From,
- State = #state{queue_durations = Durations}) ->
- _MRef = erlang:monitor(process, Pid),
- true = ets:insert(Durations, #process{pid = Pid, reported = infinity,
- sent = infinity, callback = MFA}),
+ State = #state { queue_durations = Durations }) ->
+ MRef = erlang:monitor(process, Pid),
+ true = ets:insert(Durations, #process { pid = Pid, reported = infinity,
+ sent = infinity, callback = MFA,
+ monitor = MRef }),
{reply, ok, State};
handle_call(_Request, _From, State) ->
@@ -188,7 +190,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, #state{timer = TRef}) ->
+terminate(_Reason, #state { timer = TRef }) ->
timer:cancel(TRef),
ok.
@@ -206,26 +208,29 @@ zero_clamp(Sum) ->
false -> Sum
end.
-internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
- queue_duration_count = Count,
- queue_durations = Durations}) ->
+internal_deregister(Pid, State = #state { queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations }) ->
case ets:lookup(Durations, Pid) of
[] -> State;
- [#process{reported = PrevQueueDuration}] ->
- Sum1 = case PrevQueueDuration of
- infinity -> Sum;
- _ -> zero_clamp(Sum - PrevQueueDuration)
- end,
- true = ets:delete(State#state.queue_durations, Pid),
- State#state{queue_duration_sum = Sum1,
- queue_duration_count = Count-1}
+ [#process { reported = PrevQueueDuration, monitor = MRef }] ->
+ true = erlang:demonitor(MRef),
+ {Sum1, Count1} =
+ case PrevQueueDuration of
+ infinity -> {Sum, Count};
+ _ -> {zero_clamp(Sum - PrevQueueDuration),
+ Count - 1}
+ end,
+ true = ets:delete(State #state.queue_durations, Pid),
+ State #state { queue_duration_sum = Sum1,
+ queue_duration_count = Count1 }
end.
-internal_update(State = #state{memory_limit = Limit,
- queue_durations = Durations,
- desired_duration = DesiredDurationAvg,
- queue_duration_sum = Sum,
- queue_duration_count = Count}) ->
+internal_update(State = #state { memory_limit = Limit,
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count }) ->
MemoryRatio = erlang:memory(total) / Limit,
DesiredDurationAvg1 =
case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of
@@ -238,7 +243,7 @@ internal_update(State = #state{memory_limit = Limit,
end,
(Sum1 / Count) / MemoryRatio
end,
- State1 = State#state{desired_duration = DesiredDurationAvg1},
+ State1 = State #state { desired_duration = DesiredDurationAvg1 },
%% only inform queues immediately if the desired duration has
%% decreased
@@ -252,8 +257,8 @@ internal_update(State = #state{memory_limit = Limit,
%% sometimes wakes up queues from hibernation.
true =
ets:foldl(
- fun (Proc = #process{reported = QueueDuration,
- sent = PrevSendDuration}, true) ->
+ fun (Proc = #process { reported = QueueDuration,
+ sent = PrevSendDuration }, true) ->
Send =
case {QueueDuration, PrevSendDuration} of
{infinity, infinity} -> true;
@@ -264,10 +269,11 @@ internal_update(State = #state{memory_limit = Limit,
end,
case Send of
true ->
- ok = set_queue_duration(Proc, DesiredDurationAvg1),
+ ok = set_queue_duration(Proc,
+ DesiredDurationAvg1),
ets:insert(
Durations,
- Proc#process{sent=DesiredDurationAvg1});
+ Proc #process {sent = DesiredDurationAvg1});
false -> true
end
end, true, Durations)
@@ -280,5 +286,5 @@ get_memory_limit() ->
A -> A
end.
-set_queue_duration(#process{callback={M,F,A}}, QueueDuration) ->
+set_queue_duration(#process { callback = {M, F, A} }, QueueDuration) ->
ok = erlang:apply(M, F, A++[QueueDuration]).