summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-26 15:04:36 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-26 15:04:36 +0000
commit4578759320d14668519bdb4d89439f48041074e3 (patch)
treebeebb22f0688034b7bf973e708fa268c816963b7 /src
parent0ca125b08addaa6a775b430ca4e18829e9385737 (diff)
downloadrabbitmq-server-git-4578759320d14668519bdb4d89439f48041074e3.tar.gz
Improved documentation and code regarding fast moving mainly empty queues and the SMALL_INFINITY_OSCILLATION_DURATION definition. Oh, and clamped zero.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_memory_monitor.erl94
1 files changed, 59 insertions, 35 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index f2d1e9ca75..1e84d64d7a 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -77,20 +77,31 @@
-define(SUM_INC_THRESHOLD, 0.95).
-define(SUM_INC_AMOUNT, 1.0).
-%% Queues which are empty will report a duration of 0. If all queues
-%% are empty then the memory-monitor may decide that the desired
-%% duration should also be 0, which is a disaster for fast moving
-%% queues. A fast moving queue may well oscillate between reporting 0
-%% and a small number close to 0. Thus if the number reported is under
-%% SMALL_INFINITY_OSCILLATION_DURATION and the last value we sent it
-%% was infinity, then send it infinity again. Thus its duration must
-%% rise to above SMALL_INFINITY_OSCILLATION_DURATION before we start
-%% sending it durations /= infinity.
+%% A queue may report a duration of 0, or close to zero, and may be
+%% told a duration of infinity (eg if less than LIMIT_THRESHOLD memory
+%% is being used). Subsequently, the memory-monitor can calculate the
+%% desired duration as zero, or close to zero (eg now more memory is
+%% being used, but the sum of durations is very small). If it is a
+%% fast moving queue, telling it a very small value will badly hurt
+%% it, unnecessarily: a fast moving queue will often oscillate between
+%% being empty and having a few thousand msgs in it, representing a
+%% few hundred milliseconds. SMALL_INFINITY_OSCILLATION_DURATION is a
+%% threshold: if a queue has been told a duration of infinity last
+%% time, and it's reporting a value <
+%% SMALL_INFINITY_OSCILLATION_DURATION then we send it back a duration
+%% of infinity, even if the current desired duration /= infinity. Thus
+%% for a queue which has been told infinity, it must report a duration
+%% >= SMALL_INFINITY_OSCILLATION_DURATION before it is told a
+%% non-infinity duration. This basically forms a threshold which
+%% effects faster queues more than slower queues and which accounts
+%% for natural fluctuations occurring in the queue length.
-define(SMALL_INFINITY_OSCILLATION_DURATION, 1.0).
%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
+-define(EPSILON, 0.000001). %% less than this and we clamp to 0
+
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -153,31 +164,27 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
queue_durations = Durations,
desired_duration = SendDuration}) ->
- QueueDuration1 = case infinity == QueueDuration of
- true -> infinity;
- false -> QueueDuration
- end,
[Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] =
ets:lookup(Durations, Pid),
SendDuration1 =
- case QueueDuration1 < ?SMALL_INFINITY_OSCILLATION_DURATION andalso
- PrevSendDuration == infinity of
+ case QueueDuration /= infinity andalso PrevSendDuration == infinity
+ andalso QueueDuration < ?SMALL_INFINITY_OSCILLATION_DURATION of
true -> infinity;
false -> SendDuration
end,
gen_server2:reply(From, SendDuration1),
{Sum1, Count1} =
- case {PrevQueueDuration, QueueDuration1} of
+ case {PrevQueueDuration, QueueDuration} of
{infinity, infinity} -> {Sum, Count};
- {infinity, _} -> {Sum + QueueDuration1, Count + 1};
+ {infinity, _} -> {Sum + QueueDuration, Count + 1};
{_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
- {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count}
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count}
end,
- true = ets:insert(Durations, Proc#process{reported = QueueDuration1,
+ true = ets:insert(Durations, Proc#process{reported = QueueDuration,
sent = SendDuration1}),
- {noreply, State#state{queue_duration_sum = lists:max([0.0, Sum1]),
+ {noreply, State#state{queue_duration_sum = zero_clamp(Sum1),
queue_duration_count = Count1}};
handle_call({register, Pid, MFA}, _From,
@@ -220,6 +227,12 @@ code_change(_OldVsn, State, _Extra) ->
%% Internal functions
%%----------------------------------------------------------------------------
+zero_clamp(Sum) ->
+ case Sum < ?EPSILON of
+ true -> 0.0;
+ false -> Sum
+ end.
+
internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
queue_duration_count = Count,
queue_durations = Durations}) ->
@@ -228,7 +241,7 @@ internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
[#process{reported = PrevQueueDuration}] ->
Sum1 = case PrevQueueDuration of
infinity -> Sum;
- _ -> lists:max([0.0, Sum - PrevQueueDuration])
+ _ -> zero_clamp(Sum - PrevQueueDuration)
end,
true = ets:delete(State#state.queue_durations, Pid),
State#state{queue_duration_sum = Sum1,
@@ -264,20 +277,31 @@ internal_update(State = #state{memory_limit = Limit,
%% If we have pessimistic information, we need to inform
%% queues to reduce it's memory usage when needed. This
%% sometimes wakes up queues from hibernation.
- true = ets:foldl(
- fun (Proc = #process{reported = QueueDuration,
- sent = PrevSendDuration}, true) ->
- case DesiredDurationAvg1 <
- lists:min([PrevSendDuration, QueueDuration]) of
- true ->
- ok = set_queue_duration(
- Proc, DesiredDurationAvg1),
- ets:insert(
- Durations,
- Proc#process{sent=DesiredDurationAvg1});
- false -> true
- end
- end, true, Durations)
+ true =
+ ets:foldl(
+ fun (Proc = #process{reported = QueueDuration,
+ sent = PrevSendDuration}, true) ->
+ Send =
+ case {QueueDuration, PrevSendDuration} of
+ {infinity, infinity} ->
+ true;
+ {infinity, B} ->
+ DesiredDurationAvg1 < B;
+ {A, infinity} ->
+ DesiredDurationAvg1 < A andalso A >=
+ ?SMALL_INFINITY_OSCILLATION_DURATION;
+ {A, B} ->
+ DesiredDurationAvg1 < lists:min([A,B])
+ end,
+ case Send of
+ true ->
+ ok = set_queue_duration(Proc, DesiredDurationAvg1),
+ ets:insert(
+ Durations,
+ Proc#process{sent=DesiredDurationAvg1});
+ false -> true
+ end
+ end, true, Durations)
end,
State1.