diff options
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 66 |
1 files changed, 33 insertions, 33 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 63f33836ab..b94badc480 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -123,7 +123,11 @@ stop() -> %%---------------------------------------------------------------------------- init([]) -> - MemoryLimit = trunc(get_memory_limit() * ?MEMORY_LIMIT_SCALING), + MemoryLimit = trunc(?MEMORY_LIMIT_SCALING * + (case vm_memory_monitor:get_memory_limit() of + undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; + Limit -> Limit + end)), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), @@ -150,12 +154,13 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, gen_server2:reply(From, SendDuration), {Sum1, Count1} = - case {PrevQueueDuration, QueueDuration} of - {infinity, infinity} -> {Sum, Count}; - {infinity, _} -> {Sum + QueueDuration, Count + 1}; - {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; - {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count} - end, + case {PrevQueueDuration, QueueDuration} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration, + Count} + end, true = ets:insert(Durations, Proc #process { reported = QueueDuration, sent = SendDuration }), {noreply, State #state { queue_duration_sum = zero_clamp(Sum1), @@ -216,7 +221,7 @@ internal_deregister(Pid, Demonitor, [] -> State; [#process { reported = PrevQueueDuration, monitor = MRef }] -> true = case Demonitor of - true -> erlang:demonitor(MRef); + true -> erlang:demonitor(MRef); false -> true end, {Sum1, Count1} = @@ -225,7 +230,7 @@ internal_deregister(Pid, Demonitor, _ -> {zero_clamp(Sum - PrevQueueDuration), Count - 1} end, - true = ets:delete(State #state.queue_durations, Pid), + true = ets:delete(Durations, Pid), State #state { queue_duration_sum = Sum1, queue_duration_count = Count1 } end. @@ -242,7 +247,7 @@ internal_update(State = #state { memory_limit = Limit, infinity; false -> Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of - true -> Sum + ?SUM_INC_AMOUNT; + true -> Sum + ?SUM_INC_AMOUNT; false -> Sum end, (Sum1 / Count) / MemoryRatio @@ -254,38 +259,33 @@ internal_update(State = #state { memory_limit = Limit, case DesiredDurationAvg1 == infinity orelse (DesiredDurationAvg /= infinity andalso DesiredDurationAvg1 >= DesiredDurationAvg) of - true -> ok; + true -> + ok; false -> true = ets:foldl( fun (Proc = #process { reported = QueueDuration, - sent = PrevSendDuration }, true) -> - Send = - case {QueueDuration, PrevSendDuration} of - {infinity, infinity} -> true; - {infinity, B} -> DesiredDurationAvg1 < B; - {A, infinity} -> DesiredDurationAvg1 < A; - {A, B} -> - DesiredDurationAvg1 < lists:min([A,B]) - end, - case Send of + sent = PrevSendDuration, + callback = {M, F, A} }, true) -> + case (case {QueueDuration, PrevSendDuration} of + {infinity, infinity} -> + true; + {infinity, B} -> + DesiredDurationAvg1 < B; + {A, infinity} -> + DesiredDurationAvg1 < A; + {A, B} -> + DesiredDurationAvg1 < lists:min([A,B]) + end) of true -> - ok = set_queue_duration(Proc, - DesiredDurationAvg1), + ok = erlang:apply( + M, F, A ++ [DesiredDurationAvg1]), ets:insert( Durations, Proc #process {sent = DesiredDurationAvg1}); - false -> true + false -> + true end end, true, Durations) end, State1. - -get_memory_limit() -> - case vm_memory_monitor:get_memory_limit() of - undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; - A -> A - end. - -set_queue_duration(#process { callback = {M, F, A} }, QueueDuration) -> - ok = erlang:apply(M, F, A++[QueueDuration]). |
