summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_memory_monitor.erl66
1 files changed, 33 insertions, 33 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 63f33836ab..b94badc480 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -123,7 +123,11 @@ stop() ->
%%----------------------------------------------------------------------------
init([]) ->
- MemoryLimit = trunc(get_memory_limit() * ?MEMORY_LIMIT_SCALING),
+ MemoryLimit = trunc(?MEMORY_LIMIT_SCALING *
+ (case vm_memory_monitor:get_memory_limit() of
+ undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
+ Limit -> Limit
+ end)),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
@@ -150,12 +154,13 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
gen_server2:reply(From, SendDuration),
{Sum1, Count1} =
- case {PrevQueueDuration, QueueDuration} of
- {infinity, infinity} -> {Sum, Count};
- {infinity, _} -> {Sum + QueueDuration, Count + 1};
- {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
- {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count}
- end,
+ case {PrevQueueDuration, QueueDuration} of
+ {infinity, infinity} -> {Sum, Count};
+ {infinity, _} -> {Sum + QueueDuration, Count + 1};
+ {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration,
+ Count}
+ end,
true = ets:insert(Durations, Proc #process { reported = QueueDuration,
sent = SendDuration }),
{noreply, State #state { queue_duration_sum = zero_clamp(Sum1),
@@ -216,7 +221,7 @@ internal_deregister(Pid, Demonitor,
[] -> State;
[#process { reported = PrevQueueDuration, monitor = MRef }] ->
true = case Demonitor of
- true -> erlang:demonitor(MRef);
+ true -> erlang:demonitor(MRef);
false -> true
end,
{Sum1, Count1} =
@@ -225,7 +230,7 @@ internal_deregister(Pid, Demonitor,
_ -> {zero_clamp(Sum - PrevQueueDuration),
Count - 1}
end,
- true = ets:delete(State #state.queue_durations, Pid),
+ true = ets:delete(Durations, Pid),
State #state { queue_duration_sum = Sum1,
queue_duration_count = Count1 }
end.
@@ -242,7 +247,7 @@ internal_update(State = #state { memory_limit = Limit,
infinity;
false ->
Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
- true -> Sum + ?SUM_INC_AMOUNT;
+ true -> Sum + ?SUM_INC_AMOUNT;
false -> Sum
end,
(Sum1 / Count) / MemoryRatio
@@ -254,38 +259,33 @@ internal_update(State = #state { memory_limit = Limit,
case DesiredDurationAvg1 == infinity orelse
(DesiredDurationAvg /= infinity andalso
DesiredDurationAvg1 >= DesiredDurationAvg) of
- true -> ok;
+ true ->
+ ok;
false ->
true =
ets:foldl(
fun (Proc = #process { reported = QueueDuration,
- sent = PrevSendDuration }, true) ->
- Send =
- case {QueueDuration, PrevSendDuration} of
- {infinity, infinity} -> true;
- {infinity, B} -> DesiredDurationAvg1 < B;
- {A, infinity} -> DesiredDurationAvg1 < A;
- {A, B} ->
- DesiredDurationAvg1 < lists:min([A,B])
- end,
- case Send of
+ sent = PrevSendDuration,
+ callback = {M, F, A} }, true) ->
+ case (case {QueueDuration, PrevSendDuration} of
+ {infinity, infinity} ->
+ true;
+ {infinity, B} ->
+ DesiredDurationAvg1 < B;
+ {A, infinity} ->
+ DesiredDurationAvg1 < A;
+ {A, B} ->
+ DesiredDurationAvg1 < lists:min([A,B])
+ end) of
true ->
- ok = set_queue_duration(Proc,
- DesiredDurationAvg1),
+ ok = erlang:apply(
+ M, F, A ++ [DesiredDurationAvg1]),
ets:insert(
Durations,
Proc #process {sent = DesiredDurationAvg1});
- false -> true
+ false ->
+ true
end
end, true, Durations)
end,
State1.
-
-get_memory_limit() ->
- case vm_memory_monitor:get_memory_limit() of
- undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
- A -> A
- end.
-
-set_queue_duration(#process { callback = {M, F, A} }, QueueDuration) ->
- ok = erlang:apply(M, F, A++[QueueDuration]).