diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-17 13:10:52 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-17 13:10:52 +0000 |
| commit | 81924a0fd9a04d42e76bfceb78b860bacbe3778b (patch) | |
| tree | ee559daa8daec023756a676095e76c9e6e2febf4 | |
| parent | 79919ede08cc81cc7f1ea4bf9b0b4b4314069532 (diff) | |
| download | rabbitmq-server-git-81924a0fd9a04d42e76bfceb78b860bacbe3778b.tar.gz | |
In the absense of an egress rate, use ingress rate instead. Also, if there's been no fetches/publishes since the last measurement, use the previous measurement, appropriately scaled. This means that the rates will gently fall off and approach zero in the absence of activity, which is preferable to them suddenly jumping to zero. Also, the average is now the sum of the fetches/publishes in the last two segments, over the time since the start of the last segment (i.e. it's better than before, which was just a straight /2, which would be wrong if the segments are different sizes, which they could be, given a very busy queue).
| -rw-r--r-- | src/rabbit_amqqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 100 |
4 files changed, 77 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index f7b39c7782..e5a113ae97 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,7 +33,7 @@ -export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1, +-export([internal_declare/2, internal_delete/1, remeasure_rates/1, set_queue_duration/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, @@ -114,7 +114,7 @@ -spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). --spec(remeasure_egress_rate/1 :: (pid()) -> 'ok'). +-spec(remeasure_rates/1 :: (pid()) -> 'ok'). -spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -374,8 +374,8 @@ internal_delete(QueueName) -> end end). -remeasure_egress_rate(QPid) -> - gen_server2:pcast(QPid, 9, remeasure_egress_rate). +remeasure_rates(QPid) -> + gen_server2:pcast(QPid, 9, remeasure_rates). set_queue_duration(QPid, Duration) -> gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3adf97ff85..40b19a548c 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -168,7 +168,7 @@ next_state1(State, false) -> ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) -> {ok, TRef} = timer:apply_after(?EGRESS_REMEASURE_INTERVAL, rabbit_amqqueue, - remeasure_egress_rate, [self()]), + remeasure_rates, [self()]), State#q{egress_rate_timer_ref = TRef}; ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) -> State#q{egress_rate_timer_ref = undefined}; @@ -867,8 +867,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} end)); -handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) -> - VQS1 = rabbit_variable_queue:remeasure_egress_rate(VQS), +handle_cast(remeasure_rates, State = #q{variable_queue_state = VQS}) -> + VQS1 = rabbit_variable_queue:remeasure_rates(VQS), RamDuration = rabbit_variable_queue:ram_duration(VQS1), DesiredDuration = rabbit_memory_monitor:report_queue_duration(self(), RamDuration), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ac32e2136e..b1db243f35 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1167,7 +1167,7 @@ test_variable_queue_dynamic_duration_change() -> %% start by sending in a couple of segments worth Len1 = 2*SegmentSize, {_SeqIds, VQ1} = variable_queue_publish(false, Len1, VQ0), - VQ2 = rabbit_variable_queue:remeasure_egress_rate(VQ1), + VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), {ok, _TRef} = timer:send_after(1000, {duration, 60, fun (V) -> (V*0.75)-1 end}), VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2), @@ -1203,7 +1203,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> _ -> Fun end, {ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}), - VQ4 = rabbit_variable_queue:remeasure_egress_rate(VQ3), + VQ4 = rabbit_variable_queue:remeasure_rates(VQ3), VQ5 = %% /37 otherwise the duration is just to high to stress things rabbit_variable_queue:set_queue_ram_duration_target(N/37, VQ4), io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c89cdfd587..2ee57ba7df 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,7 +32,7 @@ -module(rabbit_variable_queue). -export([init/1, terminate/1, publish/2, publish_delivered/2, - set_queue_ram_duration_target/2, remeasure_egress_rate/1, + set_queue_ram_duration_target/2, remeasure_rates/1, ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1, requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, @@ -53,9 +53,12 @@ index_state, next_seq_id, out_counter, + in_counter, egress_rate, avg_egress_rate, - egress_rate_timestamp, + ingress_rate, + avg_ingress_rate, + rate_timestamp, len, on_sync }). @@ -112,9 +115,12 @@ index_state :: any(), next_seq_id :: seq_id(), out_counter :: non_neg_integer(), - egress_rate :: float(), + in_counter :: non_neg_integer(), + egress_rate :: {{integer(), integer(), integer()}, non_neg_integer()}, avg_egress_rate :: float(), - egress_rate_timestamp :: {integer(), integer(), integer()}, + ingress_rate :: {{integer(), integer(), integer()}, non_neg_integer()}, + avg_ingress_rate :: float(), + rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]} }). @@ -127,7 +133,7 @@ {ack(), vqstate()}). -spec(set_queue_ram_duration_target/2 :: (('undefined' | number()), vqstate()) -> vqstate()). --spec(remeasure_egress_rate/1 :: (vqstate()) -> vqstate()). +-spec(remeasure_rates/1 :: (vqstate()) -> vqstate()). -spec(ram_duration/1 :: (vqstate()) -> number()). -spec(fetch/1 :: (vqstate()) -> {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}), @@ -164,6 +170,7 @@ init(QueueName) -> 0 -> #gamma { seq_id = undefined, count = 0 }; _ -> #gamma { seq_id = GammaSeqId, count = GammaCount } end, + Now = now(), State = #vqstate { q1 = queue:new(), q2 = queue:new(), gamma = Gamma, @@ -175,9 +182,12 @@ init(QueueName) -> index_state = IndexState1, next_seq_id = NextSeqId, out_counter = 0, - egress_rate = 0, + in_counter = 0, + egress_rate = {Now, 0}, avg_egress_rate = 0, - egress_rate_timestamp = now(), + ingress_rate = {Now, GammaCount}, + avg_ingress_rate = 0, + rate_timestamp = Now, len = GammaCount, on_sync = {[], [], []} }, @@ -192,28 +202,37 @@ publish(Msg, State) -> publish_delivered(Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, State = #vqstate { len = 0, index_state = IndexState, - next_seq_id = SeqId }) -> + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount}) -> + State1 = State #vqstate { out_counter = OutCount + 1, + in_counter = InCount + 1 }, case maybe_write_msg_to_disk(false, false, Msg) of true -> {true, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, true, IndexState), {{ack_index_and_store, MsgId, SeqId}, - State #vqstate { index_state = IndexState1, - next_seq_id = SeqId + 1 }}; + State1 #vqstate { index_state = IndexState1, + next_seq_id = SeqId + 1 }}; false -> - {ack_not_on_disk, State} + {ack_not_on_disk, State1} end. set_queue_ram_duration_target( - DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, + DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate, target_ram_msg_count = TargetRamMsgCount }) -> + Rate = case 0 == AvgEgressRate of + true -> AvgIngressRate; + false -> AvgEgressRate + end, TargetRamMsgCount1 = case DurationTarget of infinity -> undefined; undefined -> undefined; - _ -> trunc(DurationTarget * EgressRate) %% msgs = sec * msgs/sec + _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, @@ -223,30 +242,34 @@ set_queue_ram_duration_target( false -> reduce_memory_use(State1) end. -remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, - egress_rate_timestamp = Timestamp, - out_counter = OutCount, - duration_target = DurationTarget }) -> - %% We do an average over the last two values, but also hold the - %% current value separately so that the average always only - %% incorporates the last two values, and not the current value and - %% the last average. Averaging helps smooth out spikes. +remeasure_rates(State = #vqstate { egress_rate = Egress, + ingress_rate = Ingress, + rate_timestamp = Timestamp, + in_counter = InCount, + out_counter = OutCount, + duration_target = DurationTarget }) -> Now = now(), - %% EgressRate is in seconds, and now_diff is in microseconds - EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp), - AvgEgressRate = (EgressRate + OldEgressRate) / 2, + {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), + {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), + set_queue_ram_duration_target( DurationTarget, - State #vqstate { egress_rate = EgressRate, + State #vqstate { egress_rate = Egress1, avg_egress_rate = AvgEgressRate, - egress_rate_timestamp = Now, - out_counter = 0 }). + ingress_rate = Ingress1, + avg_ingress_rate = AvgIngressRate, + rate_timestamp = Now, + out_counter = 0, in_counter = 0 }). ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate, ram_msg_count = RamMsgCount }) -> %% msgs / (msgs/sec) == sec case AvgEgressRate == 0 of - true -> infinity; + true -> case AvgIngressRate == 0 of + true -> infinity; + false -> RamMsgCount / AvgIngressRate + end; false -> RamMsgCount / AvgEgressRate end. @@ -448,7 +471,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, - avg_egress_rate = AvgEgressRate }) -> + avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate }) -> [ {q1, queue:len(Q1)}, {q2, queue:len(Q2)}, {gamma, Gamma}, @@ -458,12 +482,22 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {outstanding_txns, length(From)}, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, - {avg_egress_rate, AvgEgressRate} ]. + {avg_egress_rate, AvgEgressRate}, + {avg_ingress_rate, AvgIngressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- +update_rate(Now, Then, Count, Rate = {OThen, OCount}) -> + %% form the avg over the current periond and the previous + Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)), + Rate1 = case 0 == Count of + true -> Rate; %% keep the last period with activity + false -> {Then, Count} + end, + {Avg, Rate1}. + persistent_msg_ids(Pubs) -> [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, Obj #basic_message.is_persistent]. @@ -666,10 +700,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, end. publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { next_seq_id = SeqId, len = Len }) -> + State = #vqstate { next_seq_id = SeqId, len = Len, + in_counter = InCount }) -> {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, - State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}. + State #vqstate { next_seq_id = SeqId + 1, len = Len + 1, + in_counter = InCount + 1 })}. publish(msg, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, |
