summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-26 16:28:46 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-26 16:28:46 +0000
commit1915489d9762806b0c84a9b43651436895eba2be (patch)
tree553de9891d34aa0ee29acf5c2ddb35655f205e0b /src
parenta978d0ef8b14df543f3beb05405f8c83c140574b (diff)
downloadrabbitmq-server-git-1915489d9762806b0c84a9b43651436895eba2be.tar.gz
Remove the small_duration_threshold, and instead smooth the ram_msg_count over the last two periods. This seems to work reasonably well.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_memory_monitor.erl39
-rw-r--r--src/rabbit_variable_queue.erl15
2 files changed, 16 insertions, 38 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 9689994acb..8266845fb2 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -77,25 +77,6 @@
-define(SUM_INC_THRESHOLD, 0.95).
-define(SUM_INC_AMOUNT, 1.0).
-%% A queue may report a duration of 0, or close to zero, and may be
-%% told a duration of infinity (eg if less than LIMIT_THRESHOLD memory
-%% is being used). Subsequently, the memory-monitor can calculate the
-%% desired duration as zero, or close to zero (eg now more memory is
-%% being used, and the sum of durations is very small). If it is a
-%% fast moving queue, telling it a very small value will badly hurt
-%% it, unnecessarily: a fast moving queue will often oscillate between
-%% being empty and having a few thousand msgs in it, representing a
-%% few hundred milliseconds. SMALL_DURATION_THRESHOLD is a threshold:
-%% if a queue has been told a duration of infinity last time, and it's
-%% reporting a value < SMALL_DURATION_THRESHOLD then we send it back a
-%% duration of infinity, even if the current desired duration /=
-%% infinity. Thus for a queue which has been told infinity, it must
-%% report a duration >= SMALL_DURATION_THRESHOLD before it is told a
-%% non-infinity duration. This basically forms a threshold which
-%% effects faster queues more than slower queues and which accounts
-%% for natural fluctuations occurring in the queue length.
--define(SMALL_DURATION_THRESHOLD, 1.0).
-
%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
@@ -166,13 +147,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
[Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] =
ets:lookup(Durations, Pid),
- SendDuration1 =
- case QueueDuration /= infinity andalso PrevSendDuration == infinity
- andalso QueueDuration < ?SMALL_DURATION_THRESHOLD of
- true -> infinity;
- false -> SendDuration
- end,
- gen_server2:reply(From, SendDuration1),
+ gen_server2:reply(From, SendDuration),
{Sum1, Count1} =
case {PrevQueueDuration, QueueDuration} of
@@ -182,7 +157,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
{_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count}
end,
true = ets:insert(Durations, Proc#process{reported = QueueDuration,
- sent = SendDuration1}),
+ sent = SendDuration}),
{noreply, State#state{queue_duration_sum = zero_clamp(Sum1),
queue_duration_count = Count1}};
@@ -282,13 +257,9 @@ internal_update(State = #state{memory_limit = Limit,
sent = PrevSendDuration}, true) ->
Send =
case {QueueDuration, PrevSendDuration} of
- {infinity, infinity} ->
- true;
- {infinity, B} ->
- DesiredDurationAvg1 < B;
- {A, infinity} ->
- DesiredDurationAvg1 < A andalso A >=
- ?SMALL_INFINITY_OSCILLATION_DURATION;
+ {infinity, infinity} -> true;
+ {infinity, B} -> DesiredDurationAvg1 < B;
+ {A, infinity} -> DesiredDurationAvg1 < A;
{A, B} ->
DesiredDurationAvg1 < lists:min([A,B])
end,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c3ad54630f..6806a0cdc8 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -49,6 +49,7 @@
duration_target,
target_ram_msg_count,
ram_msg_count,
+ ram_msg_count_prev,
queue,
index_state,
next_seq_id,
@@ -110,6 +111,8 @@
q4 :: queue(),
duration_target :: non_neg_integer(),
target_ram_msg_count :: non_neg_integer(),
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
queue :: queue_name(),
index_state :: any(),
next_seq_id :: seq_id(),
@@ -174,9 +177,10 @@ init(QueueName) ->
#vqstate { q1 = queue:new(), q2 = queue:new(),
gamma = Gamma,
q3 = queue:new(), q4 = queue:new(),
- target_ram_msg_count = undefined,
duration_target = undefined,
+ target_ram_msg_count = undefined,
ram_msg_count = 0,
+ ram_msg_count_prev = 0,
queue = QueueName,
index_state = IndexState1,
next_seq_id = NextSeqId,
@@ -243,6 +247,7 @@ remeasure_rates(State = #vqstate { egress_rate = Egress,
rate_timestamp = Timestamp,
in_counter = InCount,
out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
duration_target = DurationTarget }) ->
Now = now(),
{AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
@@ -255,15 +260,17 @@ remeasure_rates(State = #vqstate { egress_rate = Egress,
ingress_rate = Ingress1,
avg_ingress_rate = AvgIngressRate,
rate_timestamp = Now,
+ ram_msg_count_prev = RamMsgCount,
out_counter = 0, in_counter = 0 }).
ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate,
- ram_msg_count = RamMsgCount }) ->
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev }) ->
%% msgs / (msgs/sec) == sec
case AvgEgressRate == 0 andalso AvgIngressRate == 0 of
true -> infinity;
- false -> RamMsgCount / (AvgEgressRate + AvgIngressRate)
+ false -> (RamMsgCountPrev + RamMsgCount) / (2 * (AvgEgressRate + AvgIngressRate))
end.
fetch(State =
@@ -463,7 +470,7 @@ full_flush_journal(State = #vqstate { index_state = IndexState }) ->
status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
+ ram_msg_count = RamMsgCount,
avg_egress_rate = AvgEgressRate,
avg_ingress_rate = AvgIngressRate }) ->
[ {q1, queue:len(Q1)},