diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-26 16:28:46 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-26 16:28:46 +0000 |
| commit | 1915489d9762806b0c84a9b43651436895eba2be (patch) | |
| tree | 553de9891d34aa0ee29acf5c2ddb35655f205e0b /src | |
| parent | a978d0ef8b14df543f3beb05405f8c83c140574b (diff) | |
| download | rabbitmq-server-git-1915489d9762806b0c84a9b43651436895eba2be.tar.gz | |
Remove the small_duration_threshold, and instead smooth the ram_msg_count over the last two periods. This seems to work reasonably well.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
2 files changed, 16 insertions, 38 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 9689994acb..8266845fb2 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -77,25 +77,6 @@ -define(SUM_INC_THRESHOLD, 0.95). -define(SUM_INC_AMOUNT, 1.0). -%% A queue may report a duration of 0, or close to zero, and may be -%% told a duration of infinity (eg if less than LIMIT_THRESHOLD memory -%% is being used). Subsequently, the memory-monitor can calculate the -%% desired duration as zero, or close to zero (eg now more memory is -%% being used, and the sum of durations is very small). If it is a -%% fast moving queue, telling it a very small value will badly hurt -%% it, unnecessarily: a fast moving queue will often oscillate between -%% being empty and having a few thousand msgs in it, representing a -%% few hundred milliseconds. SMALL_DURATION_THRESHOLD is a threshold: -%% if a queue has been told a duration of infinity last time, and it's -%% reporting a value < SMALL_DURATION_THRESHOLD then we send it back a -%% duration of infinity, even if the current desired duration /= -%% infinity. Thus for a queue which has been told infinity, it must -%% report a duration >= SMALL_DURATION_THRESHOLD before it is told a -%% non-infinity duration. This basically forms a threshold which -%% effects faster queues more than slower queues and which accounts -%% for natural fluctuations occurring in the queue length. --define(SMALL_DURATION_THRESHOLD, 1.0). - %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). @@ -166,13 +147,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, [Proc = #process{reported = PrevQueueDuration, sent = PrevSendDuration}] = ets:lookup(Durations, Pid), - SendDuration1 = - case QueueDuration /= infinity andalso PrevSendDuration == infinity - andalso QueueDuration < ?SMALL_DURATION_THRESHOLD of - true -> infinity; - false -> SendDuration - end, - gen_server2:reply(From, SendDuration1), + gen_server2:reply(From, SendDuration), {Sum1, Count1} = case {PrevQueueDuration, QueueDuration} of @@ -182,7 +157,7 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, {_, _} -> {Sum - PrevQueueDuration + QueueDuration, Count} end, true = ets:insert(Durations, Proc#process{reported = QueueDuration, - sent = SendDuration1}), + sent = SendDuration}), {noreply, State#state{queue_duration_sum = zero_clamp(Sum1), queue_duration_count = Count1}}; @@ -282,13 +257,9 @@ internal_update(State = #state{memory_limit = Limit, sent = PrevSendDuration}, true) -> Send = case {QueueDuration, PrevSendDuration} of - {infinity, infinity} -> - true; - {infinity, B} -> - DesiredDurationAvg1 < B; - {A, infinity} -> - DesiredDurationAvg1 < A andalso A >= - ?SMALL_INFINITY_OSCILLATION_DURATION; + {infinity, infinity} -> true; + {infinity, B} -> DesiredDurationAvg1 < B; + {A, infinity} -> DesiredDurationAvg1 < A; {A, B} -> DesiredDurationAvg1 < lists:min([A,B]) end, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c3ad54630f..6806a0cdc8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -49,6 +49,7 @@ duration_target, target_ram_msg_count, ram_msg_count, + ram_msg_count_prev, queue, index_state, next_seq_id, @@ -110,6 +111,8 @@ q4 :: queue(), duration_target :: non_neg_integer(), target_ram_msg_count :: non_neg_integer(), + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), queue :: queue_name(), index_state :: any(), next_seq_id :: seq_id(), @@ -174,9 +177,10 @@ init(QueueName) -> #vqstate { q1 = queue:new(), q2 = queue:new(), gamma = Gamma, q3 = queue:new(), q4 = queue:new(), - target_ram_msg_count = undefined, duration_target = undefined, + target_ram_msg_count = undefined, ram_msg_count = 0, + ram_msg_count_prev = 0, queue = QueueName, index_state = IndexState1, next_seq_id = NextSeqId, @@ -243,6 +247,7 @@ remeasure_rates(State = #vqstate { egress_rate = Egress, rate_timestamp = Timestamp, in_counter = InCount, out_counter = OutCount, + ram_msg_count = RamMsgCount, duration_target = DurationTarget }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), @@ -255,15 +260,17 @@ remeasure_rates(State = #vqstate { egress_rate = Egress, ingress_rate = Ingress1, avg_ingress_rate = AvgIngressRate, rate_timestamp = Now, + ram_msg_count_prev = RamMsgCount, out_counter = 0, in_counter = 0 }). ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + ram_msg_count_prev = RamMsgCountPrev }) -> %% msgs / (msgs/sec) == sec case AvgEgressRate == 0 andalso AvgIngressRate == 0 of true -> infinity; - false -> RamMsgCount / (AvgEgressRate + AvgIngressRate) + false -> (RamMsgCountPrev + RamMsgCount) / (2 * (AvgEgressRate + AvgIngressRate)) end. fetch(State = @@ -463,7 +470,7 @@ full_flush_journal(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, + ram_msg_count = RamMsgCount, avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate }) -> [ {q1, queue:len(Q1)}, |
