diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-26 12:40:48 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-26 12:40:48 +0000 |
| commit | 6c09bbdbb1713851955d9bb804f1f5c145387538 (patch) | |
| tree | ea056e55c2eecae6a1b82a510e69e0664de3c026 | |
| parent | 6e122e3710f2fc76f4524d94e9dcb364dc69b018 (diff) | |
| download | rabbitmq-server-git-6c09bbdbb1713851955d9bb804f1f5c145387538.tar.gz | |
Further correction to conditional; line lengths; extract all magic numbers and -define them and document them.
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 57 |
1 files changed, 40 insertions, 17 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 6359ecc99d..d6f0c6000a 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -60,9 +60,36 @@ -define(TABLE_NAME, ?MODULE). -define(MAX_QUEUE_DURATION, 86400). %% 60*60*24 i.e. 1 day +-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this + +%% If all queues are pushed to disk (duration 0), then the sum of +%% their reported lengths will be 0. If memory then becomes available, +%% unless we manually intervene, the sum will remain 0, and the queues +%% will never get a non-zero duration. Thus when the mem use is < +%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT. +-define(SUM_INC_THRESHOLD, 0.95). +-define(SUM_INC_AMOUNT, 1.0). + +%% Queues which are empty will report a duration of 0. This may result +%% in the memory-monitor deciding that the desired duration should +%% also be 0, which is a disaster for fast moving queues. A fast +%% moving queue may well oscillate between reporting 0 and a small +%% number close to 0. Thus if the number reported is under +%% SMALL_INFINITY_OSCILLATION_DURATION and the last value we sent it +%% was infinity, then send it infinity again. Thus its duration must +%% rise to above SMALL_INFINITY_OSCILLATION_DURATION before we start +%% sending it durations /= infinity. +-define(SMALL_INFINITY_OSCILLATION_DURATION, 1.0). + %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). +%% Because we have a feedback loop here, we need to ensure that we +%% have some space for when the queues don't quite respond as fast as +%% we would like, or when there is buffering going on in other parts +%% of the system. In short, we aim to stay some distance away from +%% when the memory alarms will go off, which cause channel.flow. +-define(MEMORY_LIMIT_SCALING, 0.6). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -104,10 +131,7 @@ stop() -> %%---------------------------------------------------------------------------- init([]) -> - %% We should never use more memory than user requested. As the memory - %% manager doesn't really know how much memory queues are using, we shall - %% try to remain safe distance from real throttle limit. - MemoryLimit = trunc(get_memory_limit() * 0.6), + MemoryLimit = trunc(get_memory_limit() * ?MEMORY_LIMIT_SCALING), {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), @@ -137,7 +161,8 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, ets:lookup(Durations, Pid), SendDuration1 = - case QueueDuration1 < 1 andalso PrevSendDuration == infinity of + case QueueDuration1 < ?SMALL_INFINITY_OSCILLATION_DURATION andalso + PrevSendDuration == infinity of true -> infinity; false -> SendDuration end, @@ -152,7 +177,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, end, true = ets:insert(Durations, Proc#process{reported = QueueDuration1, sent = SendDuration1}), - {noreply, State#state{queue_duration_sum = lists:max([0, Sum1]), + {noreply, State#state{queue_duration_sum = lists:max([0.0, Sum1]), queue_duration_count = Count1}}; handle_call({register, Pid, MFA}, _From, @@ -203,7 +228,7 @@ internal_deregister(Pid, State = #state{queue_duration_sum = Sum, [#process{reported = PrevQueueDuration}] -> Sum1 = case PrevQueueDuration of infinity -> Sum; - _ -> lists:max([0, Sum - PrevQueueDuration]) + _ -> lists:max([0.0, Sum - PrevQueueDuration]) end, true = ets:delete(State#state.queue_durations, Pid), State#state{queue_duration_sum = Sum1, @@ -215,13 +240,9 @@ internal_update(State = #state{memory_limit = Limit, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, queue_duration_count = Count}) -> - %% available memory / used memory - MemoryRatio = Limit / erlang:memory(total), - %% if all queues are pushed to disk, then Sum will be 0. If memory - %% then becomes available, unless we do the following, we will - %% never allow queues to come off disk. - Sum1 = case MemoryRatio > 1.05 of - true -> Sum + 1; + MemoryRatio = erlang:memory(total) / Limit, + Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of + true -> Sum + ?SUM_INC_AMOUNT; false -> Sum end, AvgDuration = case Count == 0 of @@ -229,16 +250,18 @@ internal_update(State = #state{memory_limit = Limit, false -> Sum1 / Count end, DesiredDurationAvg1 = - case AvgDuration == infinity orelse MemoryRatio > 2 of + case AvgDuration == infinity orelse MemoryRatio < ?LIMIT_THRESHOLD of true -> infinity; - false -> AvgDuration * MemoryRatio + false -> AvgDuration / MemoryRatio end, State1 = State#state{desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased - case (DesiredDurationAvg == infinity andalso DesiredDurationAvg1 /= infinity) + case (DesiredDurationAvg == infinity andalso + DesiredDurationAvg1 /= infinity) orelse (DesiredDurationAvg /= infinity andalso + DesiredDurationAvg1 /= infinity andalso DesiredDurationAvg > DesiredDurationAvg1) of true -> %% If we have pessimistic information, we need to inform |
