diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-26 17:19:42 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-26 17:19:42 +0000 |
| commit | d1a508effe82628fc0e088afbeb8783b56bd5f4e (patch) | |
| tree | 759977f1619193dffe7cf05a648e456b0323a5e8 | |
| parent | 961f6078f099e462980001ac1b88ea17ae42ef29 (diff) | |
| download | rabbitmq-server-git-d1a508effe82628fc0e088afbeb8783b56bd5f4e.tar.gz | |
Added demonitoring, corrected counting on deregistering, and got fed up with the lack of whitespace
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 92 |
1 files changed, 49 insertions, 43 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 5f39f7a96f..1b2c698297 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -45,7 +45,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --record(process, {pid, reported, sent, callback}). +-record(process, {pid, reported, sent, callback, monitor}). -record(state, {timer, %% 'internal_update' timer queue_durations, %% ets #process @@ -131,20 +131,21 @@ init([]) -> Ets = ets:new(?TABLE_NAME, [set, private, {keypos, #process.pid}]), {ok, internal_update( - #state{timer = TRef, - queue_durations = Ets, - queue_duration_sum = 0.0, - queue_duration_count = 0, - memory_limit = MemoryLimit, - desired_duration = infinity})}. + #state { timer = TRef, + queue_durations = Ets, + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + desired_duration = infinity })}. handle_call({report_queue_duration, Pid, QueueDuration}, From, - State = #state{queue_duration_sum = Sum, - queue_duration_count = Count, - queue_durations = Durations, - desired_duration = SendDuration}) -> + State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations, + desired_duration = SendDuration }) -> - [Proc = #process{reported=PrevQueueDuration}] = ets:lookup(Durations, Pid), + [Proc = #process { reported = PrevQueueDuration }] = + ets:lookup(Durations, Pid), gen_server2:reply(From, SendDuration), @@ -155,16 +156,17 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count} end, - true = ets:insert(Durations, Proc#process{reported = QueueDuration, - sent = SendDuration}), - {noreply, State#state{queue_duration_sum = zero_clamp(Sum1), - queue_duration_count = Count1}}; + true = ets:insert(Durations, Proc #process { reported = QueueDuration, + sent = SendDuration }), + {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), + queue_duration_count = Count1 }}; handle_call({register, Pid, MFA}, _From, - State = #state{queue_durations = Durations}) -> - _MRef = erlang:monitor(process, Pid), - true = ets:insert(Durations, #process{pid = Pid, reported = infinity, - sent = infinity, callback = MFA}), + State = #state { queue_durations = Durations }) -> + MRef = erlang:monitor(process, Pid), + true = ets:insert(Durations, #process { pid = Pid, reported = infinity, + sent = infinity, callback = MFA, + monitor = MRef }), {reply, ok, State}; handle_call(_Request, _From, State) -> @@ -188,7 +190,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, #state{timer = TRef}) -> +terminate(_Reason, #state { timer = TRef }) -> timer:cancel(TRef), ok. @@ -206,26 +208,29 @@ zero_clamp(Sum) -> false -> Sum end. -internal_deregister(Pid, State = #state{queue_duration_sum = Sum, - queue_duration_count = Count, - queue_durations = Durations}) -> +internal_deregister(Pid, State = #state { queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations }) -> case ets:lookup(Durations, Pid) of [] -> State; - [#process{reported = PrevQueueDuration}] -> - Sum1 = case PrevQueueDuration of - infinity -> Sum; - _ -> zero_clamp(Sum - PrevQueueDuration) - end, - true = ets:delete(State#state.queue_durations, Pid), - State#state{queue_duration_sum = Sum1, - queue_duration_count = Count-1} + [#process { reported = PrevQueueDuration, monitor = MRef }] -> + true = erlang:demonitor(MRef), + {Sum1, Count1} = + case PrevQueueDuration of + infinity -> {Sum, Count}; + _ -> {zero_clamp(Sum - PrevQueueDuration), + Count - 1} + end, + true = ets:delete(State #state.queue_durations, Pid), + State #state { queue_duration_sum = Sum1, + queue_duration_count = Count1 } end. -internal_update(State = #state{memory_limit = Limit, - queue_durations = Durations, - desired_duration = DesiredDurationAvg, - queue_duration_sum = Sum, - queue_duration_count = Count}) -> +internal_update(State = #state { memory_limit = Limit, + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count }) -> MemoryRatio = erlang:memory(total) / Limit, DesiredDurationAvg1 = case MemoryRatio < ?LIMIT_THRESHOLD orelse Count == 0 of @@ -238,7 +243,7 @@ internal_update(State = #state{memory_limit = Limit, end, (Sum1 / Count) / MemoryRatio end, - State1 = State#state{desired_duration = DesiredDurationAvg1}, + State1 = State #state { desired_duration = DesiredDurationAvg1 }, %% only inform queues immediately if the desired duration has %% decreased @@ -252,8 +257,8 @@ internal_update(State = #state{memory_limit = Limit, %% sometimes wakes up queues from hibernation. true = ets:foldl( - fun (Proc = #process{reported = QueueDuration, - sent = PrevSendDuration}, true) -> + fun (Proc = #process { reported = QueueDuration, + sent = PrevSendDuration }, true) -> Send = case {QueueDuration, PrevSendDuration} of {infinity, infinity} -> true; @@ -264,10 +269,11 @@ internal_update(State = #state{memory_limit = Limit, end, case Send of true -> - ok = set_queue_duration(Proc, DesiredDurationAvg1), + ok = set_queue_duration(Proc, + DesiredDurationAvg1), ets:insert( Durations, - Proc#process{sent=DesiredDurationAvg1}); + Proc #process {sent = DesiredDurationAvg1}); false -> true end end, true, Durations) @@ -280,5 +286,5 @@ get_memory_limit() -> A -> A end. -set_queue_duration(#process{callback={M,F,A}}, QueueDuration) -> +set_queue_duration(#process { callback = {M, F, A} }, QueueDuration) -> ok = erlang:apply(M, F, A++[QueueDuration]). |
