summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-26 12:40:48 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-26 12:40:48 +0000
commit6c09bbdbb1713851955d9bb804f1f5c145387538 (patch)
treeea056e55c2eecae6a1b82a510e69e0664de3c026
parent6e122e3710f2fc76f4524d94e9dcb364dc69b018 (diff)
downloadrabbitmq-server-git-6c09bbdbb1713851955d9bb804f1f5c145387538.tar.gz
Further correction to conditional; line lengths; extract all magic numbers and -define them and document them.
-rw-r--r--src/rabbit_memory_monitor.erl57
1 files changed, 40 insertions, 17 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 6359ecc99d..d6f0c6000a 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -60,9 +60,36 @@
-define(TABLE_NAME, ?MODULE).
-define(MAX_QUEUE_DURATION, 86400). %% 60*60*24 i.e. 1 day
+-define(LIMIT_THRESHOLD, 0.5). %% don't limit queues when mem use is < this
+
+%% If all queues are pushed to disk (duration 0), then the sum of
+%% their reported lengths will be 0. If memory then becomes available,
+%% unless we manually intervene, the sum will remain 0, and the queues
+%% will never get a non-zero duration. Thus when the mem use is <
+%% SUM_INC_THRESHOLD, increase the sum artificially by SUM_INC_AMOUNT.
+-define(SUM_INC_THRESHOLD, 0.95).
+-define(SUM_INC_AMOUNT, 1.0).
+
+%% Queues which are empty will report a duration of 0. This may result
+%% in the memory-monitor deciding that the desired duration should
+%% also be 0, which is a disaster for fast moving queues. A fast
+%% moving queue may well oscillate between reporting 0 and a small
+%% number close to 0. Thus if the number reported is under
+%% SMALL_INFINITY_OSCILLATION_DURATION and the last value we sent it
+%% was infinity, then send it infinity again. Thus its duration must
+%% rise to above SMALL_INFINITY_OSCILLATION_DURATION before we start
+%% sending it durations /= infinity.
+-define(SMALL_INFINITY_OSCILLATION_DURATION, 1.0).
+
%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
+%% Because we have a feedback loop here, we need to ensure that we
+%% have some space for when the queues don't quite respond as fast as
+%% we would like, or when there is buffering going on in other parts
+%% of the system. In short, we aim to stay some distance away from
+%% when the memory alarms will go off, which cause channel.flow.
+-define(MEMORY_LIMIT_SCALING, 0.6).
%%----------------------------------------------------------------------------
-ifdef(use_specs).
@@ -104,10 +131,7 @@ stop() ->
%%----------------------------------------------------------------------------
init([]) ->
- %% We should never use more memory than user requested. As the memory
- %% manager doesn't really know how much memory queues are using, we shall
- %% try to remain safe distance from real throttle limit.
- MemoryLimit = trunc(get_memory_limit() * 0.6),
+ MemoryLimit = trunc(get_memory_limit() * ?MEMORY_LIMIT_SCALING),
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
@@ -137,7 +161,8 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
ets:lookup(Durations, Pid),
SendDuration1 =
- case QueueDuration1 < 1 andalso PrevSendDuration == infinity of
+ case QueueDuration1 < ?SMALL_INFINITY_OSCILLATION_DURATION andalso
+ PrevSendDuration == infinity of
true -> infinity;
false -> SendDuration
end,
@@ -152,7 +177,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
end,
true = ets:insert(Durations, Proc#process{reported = QueueDuration1,
sent = SendDuration1}),
- {noreply, State#state{queue_duration_sum = lists:max([0, Sum1]),
+ {noreply, State#state{queue_duration_sum = lists:max([0.0, Sum1]),
queue_duration_count = Count1}};
handle_call({register, Pid, MFA}, _From,
@@ -203,7 +228,7 @@ internal_deregister(Pid, State = #state{queue_duration_sum = Sum,
[#process{reported = PrevQueueDuration}] ->
Sum1 = case PrevQueueDuration of
infinity -> Sum;
- _ -> lists:max([0, Sum - PrevQueueDuration])
+ _ -> lists:max([0.0, Sum - PrevQueueDuration])
end,
true = ets:delete(State#state.queue_durations, Pid),
State#state{queue_duration_sum = Sum1,
@@ -215,13 +240,9 @@ internal_update(State = #state{memory_limit = Limit,
desired_duration = DesiredDurationAvg,
queue_duration_sum = Sum,
queue_duration_count = Count}) ->
- %% available memory / used memory
- MemoryRatio = Limit / erlang:memory(total),
- %% if all queues are pushed to disk, then Sum will be 0. If memory
- %% then becomes available, unless we do the following, we will
- %% never allow queues to come off disk.
- Sum1 = case MemoryRatio > 1.05 of
- true -> Sum + 1;
+ MemoryRatio = erlang:memory(total) / Limit,
+ Sum1 = case MemoryRatio < ?SUM_INC_THRESHOLD of
+ true -> Sum + ?SUM_INC_AMOUNT;
false -> Sum
end,
AvgDuration = case Count == 0 of
@@ -229,16 +250,18 @@ internal_update(State = #state{memory_limit = Limit,
false -> Sum1 / Count
end,
DesiredDurationAvg1 =
- case AvgDuration == infinity orelse MemoryRatio > 2 of
+ case AvgDuration == infinity orelse MemoryRatio < ?LIMIT_THRESHOLD of
true -> infinity;
- false -> AvgDuration * MemoryRatio
+ false -> AvgDuration / MemoryRatio
end,
State1 = State#state{desired_duration = DesiredDurationAvg1},
%% only inform queues immediately if the desired duration has
%% decreased
- case (DesiredDurationAvg == infinity andalso DesiredDurationAvg1 /= infinity)
+ case (DesiredDurationAvg == infinity andalso
+ DesiredDurationAvg1 /= infinity)
orelse (DesiredDurationAvg /= infinity andalso
+ DesiredDurationAvg1 /= infinity andalso
DesiredDurationAvg > DesiredDurationAvg1) of
true ->
%% If we have pessimistic information, we need to inform