diff options
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 65 |
1 files changed, 32 insertions, 33 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index d6f0c6000a..2fc983b4f4 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -58,7 +58,14 @@ -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). --define(MAX_QUEUE_DURATION, 86400). %% 60*60*24 i.e. 1 day + +%% Because we have a feedback loop here, we need to ensure that we +%% have some space for when the queues don't quite respond as fast as +%% we would like, or when there is buffering going on in other parts +%% of the system. In short, we aim to stay some distance away from +%% when the memory alarms will go off, which cause channel.flow. +%% Note that all other Thresholds are relative to this scaling. +-define(MEMORY_LIMIT_SCALING, 0.6). -define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this @@ -70,11 +77,11 @@ -define(SUM_INC_THRESHOLD, 0.95). -define(SUM_INC_AMOUNT, 1.0). -%% Queues which are empty will report a duration of 0. This may result -%% in the memory-monitor deciding that the desired duration should -%% also be 0, which is a disaster for fast moving queues. A fast -%% moving queue may well oscillate between reporting 0 and a small -%% number close to 0. Thus if the number reported is under +%% Queues which are empty will report a duration of 0. If all queues +%% are empty then the memory-monitor may decide that the desired +%% duration should also be 0, which is a disaster for fast moving +%% queues. A fast moving queue may well oscillate between reporting 0 +%% and a small number close to 0. Thus if the number reported is under %% SMALL_INFINITY_OSCILLATION_DURATION and the last value we sent it %% was infinity, then send it infinity again. Thus its duration must %% rise to above SMALL_INFINITY_OSCILLATION_DURATION before we start @@ -84,12 +91,6 @@ %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). -%% Because we have a feedback loop here, we need to ensure that we -%% have some space for when the queues don't quite respond as fast as -%% we would like, or when there is buffering going on in other parts -%% of the system. In short, we aim to stay some distance away from -%% when the memory alarms will go off, which cause channel.flow. --define(MEMORY_LIMIT_SCALING, 0.6). %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -152,8 +153,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, queue_durations = Durations, desired_duration = SendDuration}) -> - QueueDuration1 = case infinity == QueueDuration orelse - QueueDuration > ?MAX_QUEUE_DURATION of + QueueDuration1 = case infinity == QueueDuration of true -> infinity; false -> QueueDuration end, @@ -241,29 +241,29 @@ internal_update(State = #state{memory_limit = Limit, queue_duration_sum = Sum, queue_duration_count = Count}) -> MemoryRatio = erlang:memory(total) / Limit, - Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of - true -> Sum + ?SUM_INC_AMOUNT; - false -> Sum - end, - AvgDuration = case Count == 0 of - true -> infinity; - false -> Sum1 / Count - end, DesiredDurationAvg1 = - case AvgDuration == infinity orelse MemoryRatio < ?LIMIT_THRESHOLD of - true -> infinity; - false -> AvgDuration / MemoryRatio + case MemoryRatio < ?LIMIT_THRESHOLD of + true -> + infinity; + false -> + Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of + true -> Sum + ?SUM_INC_AMOUNT; + false -> Sum + end, + case Count == 0 of + true -> infinity; + false -> (Sum1 / Count) / MemoryRatio + end end, State1 = State#state{desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased - case (DesiredDurationAvg == infinity andalso - DesiredDurationAvg1 /= infinity) - orelse (DesiredDurationAvg /= infinity andalso - DesiredDurationAvg1 /= infinity andalso - DesiredDurationAvg > DesiredDurationAvg1) of - true -> + case DesiredDurationAvg1 == infinity orelse + (DesiredDurationAvg /= infinity andalso + DesiredDurationAvg1 >= DesiredDurationAvg) of + true -> ok; + false -> %% If we have pessimistic information, we need to inform %% queues to reduce it's memory usage when needed. This %% sometimes wakes up queues from hibernation. @@ -280,8 +280,7 @@ internal_update(State = #state{memory_limit = Limit, Proc#process{sent=DesiredDurationAvg1}); false -> true end - end, true, Durations); - false -> ok + end, true, Durations) end, State1. |
