diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-26 15:04:36 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-26 15:04:36 +0000 |
| commit | 4578759320d14668519bdb4d89439f48041074e3 (patch) | |
| tree | beebb22f0688034b7bf973e708fa268c816963b7 /src | |
| parent | 0ca125b08addaa6a775b430ca4e18829e9385737 (diff) | |
| download | rabbitmq-server-git-4578759320d14668519bdb4d89439f48041074e3.tar.gz | |
Improved documentation and code regarding fast moving mainly empty queues and the SMALL_INFINITY_OSCILLATION_DURATION definition. Oh, and clamped zero.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 94 |
1 files changed, 59 insertions, 35 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index f2d1e9ca75..1e84d64d7a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -77,20 +77,31 @@ -define(SUM_INC_THRESHOLD, 0.95). -define(SUM_INC_AMOUNT, 1.0). -%% Queues which are empty will report a duration of 0. If all queues -%% are empty then the memory-monitor may decide that the desired -%% duration should also be 0, which is a disaster for fast moving -%% queues. A fast moving queue may well oscillate between reporting 0 -%% and a small number close to 0. Thus if the number reported is under -%% SMALL_INFINITY_OSCILLATION_DURATION and the last value we sent it -%% was infinity, then send it infinity again. Thus its duration must -%% rise to above SMALL_INFINITY_OSCILLATION_DURATION before we start -%% sending it durations /= infinity. +%% A queue may report a duration of 0, or close to zero, and may be +%% told a duration of infinity (eg if less than LIMIT_THRESHOLD memory +%% is being used). Subsequently, the memory-monitor can calculate the +%% desired duration as zero, or close to zero (eg now more memory is +%% being used, but the sum of durations is very small). If it is a +%% fast moving queue, telling it a very small value will badly hurt +%% it, unnecessarily: a fast moving queue will often oscillate between +%% being empty and having a few thousand msgs in it, representing a +%% few hundred milliseconds. SMALL_INFINITY_OSCILLATION_DURATION is a +%% threshold: if a queue has been told a duration of infinity last +%% time, and it's reporting a value < +%% SMALL_INFINITY_OSCILLATION_DURATION then we send it back a duration +%% of infinity, even if the current desired duration /= infinity. Thus +%% for a queue which has been told infinity, it must report a duration +%% >= SMALL_INFINITY_OSCILLATION_DURATION before it is told a +%% non-infinity duration. This basically forms a threshold which +%% effects faster queues more than slower queues and which accounts +%% for natural fluctuations occurring in the queue length. -define(SMALL_INFINITY_OSCILLATION_DURATION, 1.0). %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). +-define(EPSILON, 0.000001). %% less than this and we clamp to 0 + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -153,31 +164,27 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, queue_durations = Durations, desired_duration = SendDuration}) -> - QueueDuration1 = case infinity == QueueDuration of - true -> infinity; - false -> QueueDuration - end, [Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] = ets:lookup(Durations, Pid), SendDuration1 = - case QueueDuration1 < ?SMALL_INFINITY_OSCILLATION_DURATION andalso - PrevSendDuration == infinity of + case QueueDuration /= infinity andalso PrevSendDuration == infinity + andalso QueueDuration < ?SMALL_INFINITY_OSCILLATION_DURATION of true -> infinity; false -> SendDuration end, gen_server2:reply(From, SendDuration1), {Sum1, Count1} = - case {PrevQueueDuration, QueueDuration1} of + case {PrevQueueDuration, QueueDuration} of {infinity, infinity} -> {Sum, Count}; - {infinity, _} -> {Sum + QueueDuration1, Count + 1}; + {infinity, _} -> {Sum + QueueDuration, Count + 1}; {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; - {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count} + {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count} end, - true = ets:insert(Durations, Proc#process{reported = QueueDuration1, + true = ets:insert(Durations, Proc#process{reported = QueueDuration, sent = SendDuration1}), - {noreply, State#state{queue_duration_sum = lists:max([0.0, Sum1]), + {noreply, State#state{queue_duration_sum = zero_clamp(Sum1), queue_duration_count = Count1}}; handle_call({register, Pid, MFA}, _From, @@ -220,6 +227,12 @@ code_change(_OldVsn, State, _Extra) -> %% Internal functions %%---------------------------------------------------------------------------- +zero_clamp(Sum) -> + case Sum < ?EPSILON of + true -> 0.0; + false -> Sum + end. + internal_deregister(Pid, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, queue_durations = Durations}) -> @@ -228,7 +241,7 @@ internal_deregister(Pid, State = #state{queue_duration_sum = Sum, [#process{reported = PrevQueueDuration}] -> Sum1 = case PrevQueueDuration of infinity -> Sum; - _ -> lists:max([0.0, Sum - PrevQueueDuration]) + _ -> zero_clamp(Sum - PrevQueueDuration) end, true = ets:delete(State#state.queue_durations, Pid), State#state{queue_duration_sum = Sum1, @@ -264,20 +277,31 @@ internal_update(State = #state{memory_limit = Limit, %% If we have pessimistic information, we need to inform %% queues to reduce it's memory usage when needed. This %% sometimes wakes up queues from hibernation. - true = ets:foldl( - fun (Proc = #process{reported = QueueDuration, - sent = PrevSendDuration}, true) -> - case DesiredDurationAvg1 < - lists:min([PrevSendDuration, QueueDuration]) of - true -> - ok = set_queue_duration( - Proc, DesiredDurationAvg1), - ets:insert( - Durations, - Proc#process{sent=DesiredDurationAvg1}); - false -> true - end - end, true, Durations) + true = + ets:foldl( + fun (Proc = #process{reported = QueueDuration, + sent = PrevSendDuration}, true) -> + Send = + case {QueueDuration, PrevSendDuration} of + {infinity, infinity} -> + true; + {infinity, B} -> + DesiredDurationAvg1 < B; + {A, infinity} -> + DesiredDurationAvg1 < A andalso A >= + ?SMALL_INFINITY_OSCILLATION_DURATION; + {A, B} -> + DesiredDurationAvg1 < lists:min([A,B]) + end, + case Send of + true -> + ok = set_queue_duration(Proc, DesiredDurationAvg1), + ets:insert( + Durations, + Proc#process{sent=DesiredDurationAvg1}); + false -> true + end + end, true, Durations) end, State1. |
