summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-17 13:10:52 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-17 13:10:52 +0000
commit81924a0fd9a04d42e76bfceb78b860bacbe3778b (patch)
treeee559daa8daec023756a676095e76c9e6e2febf4
parent79919ede08cc81cc7f1ea4bf9b0b4b4314069532 (diff)
downloadrabbitmq-server-git-81924a0fd9a04d42e76bfceb78b860bacbe3778b.tar.gz
In the absense of an egress rate, use ingress rate instead. Also, if there's been no fetches/publishes since the last measurement, use the previous measurement, appropriately scaled. This means that the rates will gently fall off and approach zero in the absence of activity, which is preferable to them suddenly jumping to zero. Also, the average is now the sum of the fetches/publishes in the last two segments, over the time since the start of the last segment (i.e. it's better than before, which was just a straight /2, which would be wrong if the segments are different sizes, which they could be, given a very busy queue).
-rw-r--r--src/rabbit_amqqueue.erl8
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_tests.erl4
-rw-r--r--src/rabbit_variable_queue.erl100
4 files changed, 77 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index f7b39c7782..e5a113ae97 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,7 +33,7 @@
-export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3,
purge/1]).
--export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1,
+-export([internal_declare/2, internal_delete/1, remeasure_rates/1,
set_queue_duration/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
@@ -114,7 +114,7 @@
-spec(tx_commit_vq_callback/1 :: (pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
--spec(remeasure_egress_rate/1 :: (pid()) -> 'ok').
+-spec(remeasure_rates/1 :: (pid()) -> 'ok').
-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -374,8 +374,8 @@ internal_delete(QueueName) ->
end
end).
-remeasure_egress_rate(QPid) ->
- gen_server2:pcast(QPid, 9, remeasure_egress_rate).
+remeasure_rates(QPid) ->
+ gen_server2:pcast(QPid, 9, remeasure_rates).
set_queue_duration(QPid, Duration) ->
gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3adf97ff85..40b19a548c 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -168,7 +168,7 @@ next_state1(State, false) ->
ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = undefined}) ->
{ok, TRef} = timer:apply_after(?EGRESS_REMEASURE_INTERVAL, rabbit_amqqueue,
- remeasure_egress_rate, [self()]),
+ remeasure_rates, [self()]),
State#q{egress_rate_timer_ref = TRef};
ensure_egress_rate_timer(State = #q{egress_rate_timer_ref = just_measured}) ->
State#q{egress_rate_timer_ref = undefined};
@@ -867,8 +867,8 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
end));
-handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) ->
- VQS1 = rabbit_variable_queue:remeasure_egress_rate(VQS),
+handle_cast(remeasure_rates, State = #q{variable_queue_state = VQS}) ->
+ VQS1 = rabbit_variable_queue:remeasure_rates(VQS),
RamDuration = rabbit_variable_queue:ram_duration(VQS1),
DesiredDuration =
rabbit_memory_monitor:report_queue_duration(self(), RamDuration),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ac32e2136e..b1db243f35 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1167,7 +1167,7 @@ test_variable_queue_dynamic_duration_change() ->
%% start by sending in a couple of segments worth
Len1 = 2*SegmentSize,
{_SeqIds, VQ1} = variable_queue_publish(false, Len1, VQ0),
- VQ2 = rabbit_variable_queue:remeasure_egress_rate(VQ1),
+ VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
{ok, _TRef} = timer:send_after(1000, {duration, 60,
fun (V) -> (V*0.75)-1 end}),
VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2),
@@ -1203,7 +1203,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
_ -> Fun
end,
{ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}),
- VQ4 = rabbit_variable_queue:remeasure_egress_rate(VQ3),
+ VQ4 = rabbit_variable_queue:remeasure_rates(VQ3),
VQ5 = %% /37 otherwise the duration is just to high to stress things
rabbit_variable_queue:set_queue_ram_duration_target(N/37, VQ4),
io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c89cdfd587..2ee57ba7df 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,7 +32,7 @@
-module(rabbit_variable_queue).
-export([init/1, terminate/1, publish/2, publish_delivered/2,
- set_queue_ram_duration_target/2, remeasure_egress_rate/1,
+ set_queue_ram_duration_target/2, remeasure_rates/1,
ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1,
requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
@@ -53,9 +53,12 @@
index_state,
next_seq_id,
out_counter,
+ in_counter,
egress_rate,
avg_egress_rate,
- egress_rate_timestamp,
+ ingress_rate,
+ avg_ingress_rate,
+ rate_timestamp,
len,
on_sync
}).
@@ -112,9 +115,12 @@
index_state :: any(),
next_seq_id :: seq_id(),
out_counter :: non_neg_integer(),
- egress_rate :: float(),
+ in_counter :: non_neg_integer(),
+ egress_rate :: {{integer(), integer(), integer()}, non_neg_integer()},
avg_egress_rate :: float(),
- egress_rate_timestamp :: {integer(), integer(), integer()},
+ ingress_rate :: {{integer(), integer(), integer()}, non_neg_integer()},
+ avg_ingress_rate :: float(),
+ rate_timestamp :: {integer(), integer(), integer()},
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}
}).
@@ -127,7 +133,7 @@
{ack(), vqstate()}).
-spec(set_queue_ram_duration_target/2 ::
(('undefined' | number()), vqstate()) -> vqstate()).
--spec(remeasure_egress_rate/1 :: (vqstate()) -> vqstate()).
+-spec(remeasure_rates/1 :: (vqstate()) -> vqstate()).
-spec(ram_duration/1 :: (vqstate()) -> number()).
-spec(fetch/1 :: (vqstate()) ->
{('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
@@ -164,6 +170,7 @@ init(QueueName) ->
0 -> #gamma { seq_id = undefined, count = 0 };
_ -> #gamma { seq_id = GammaSeqId, count = GammaCount }
end,
+ Now = now(),
State =
#vqstate { q1 = queue:new(), q2 = queue:new(),
gamma = Gamma,
@@ -175,9 +182,12 @@ init(QueueName) ->
index_state = IndexState1,
next_seq_id = NextSeqId,
out_counter = 0,
- egress_rate = 0,
+ in_counter = 0,
+ egress_rate = {Now, 0},
avg_egress_rate = 0,
- egress_rate_timestamp = now(),
+ ingress_rate = {Now, GammaCount},
+ avg_ingress_rate = 0,
+ rate_timestamp = Now,
len = GammaCount,
on_sync = {[], [], []}
},
@@ -192,28 +202,37 @@ publish(Msg, State) ->
publish_delivered(Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
State = #vqstate { len = 0, index_state = IndexState,
- next_seq_id = SeqId }) ->
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount}) ->
+ State1 = State #vqstate { out_counter = OutCount + 1,
+ in_counter = InCount + 1 },
case maybe_write_msg_to_disk(false, false, Msg) of
true ->
{true, IndexState1} =
maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId,
true, IndexState),
{{ack_index_and_store, MsgId, SeqId},
- State #vqstate { index_state = IndexState1,
- next_seq_id = SeqId + 1 }};
+ State1 #vqstate { index_state = IndexState1,
+ next_seq_id = SeqId + 1 }};
false ->
- {ack_not_on_disk, State}
+ {ack_not_on_disk, State1}
end.
set_queue_ram_duration_target(
- DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
+ DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate,
target_ram_msg_count = TargetRamMsgCount
}) ->
+ Rate = case 0 == AvgEgressRate of
+ true -> AvgIngressRate;
+ false -> AvgEgressRate
+ end,
TargetRamMsgCount1 =
case DurationTarget of
infinity -> undefined;
undefined -> undefined;
- _ -> trunc(DurationTarget * EgressRate) %% msgs = sec * msgs/sec
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
@@ -223,30 +242,34 @@ set_queue_ram_duration_target(
false -> reduce_memory_use(State1)
end.
-remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
- egress_rate_timestamp = Timestamp,
- out_counter = OutCount,
- duration_target = DurationTarget }) ->
- %% We do an average over the last two values, but also hold the
- %% current value separately so that the average always only
- %% incorporates the last two values, and not the current value and
- %% the last average. Averaging helps smooth out spikes.
+remeasure_rates(State = #vqstate { egress_rate = Egress,
+ ingress_rate = Ingress,
+ rate_timestamp = Timestamp,
+ in_counter = InCount,
+ out_counter = OutCount,
+ duration_target = DurationTarget }) ->
Now = now(),
- %% EgressRate is in seconds, and now_diff is in microseconds
- EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp),
- AvgEgressRate = (EgressRate + OldEgressRate) / 2,
+ {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
+ {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
+
set_queue_ram_duration_target(
DurationTarget,
- State #vqstate { egress_rate = EgressRate,
+ State #vqstate { egress_rate = Egress1,
avg_egress_rate = AvgEgressRate,
- egress_rate_timestamp = Now,
- out_counter = 0 }).
+ ingress_rate = Ingress1,
+ avg_ingress_rate = AvgIngressRate,
+ rate_timestamp = Now,
+ out_counter = 0, in_counter = 0 }).
ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate,
ram_msg_count = RamMsgCount }) ->
%% msgs / (msgs/sec) == sec
case AvgEgressRate == 0 of
- true -> infinity;
+ true -> case AvgIngressRate == 0 of
+ true -> infinity;
+ false -> RamMsgCount / AvgIngressRate
+ end;
false -> RamMsgCount / AvgEgressRate
end.
@@ -448,7 +471,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
- avg_egress_rate = AvgEgressRate }) ->
+ avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate }) ->
[ {q1, queue:len(Q1)},
{q2, queue:len(Q2)},
{gamma, Gamma},
@@ -458,12 +482,22 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{outstanding_txns, length(From)},
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
- {avg_egress_rate, AvgEgressRate} ].
+ {avg_egress_rate, AvgEgressRate},
+ {avg_ingress_rate, AvgIngressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
+update_rate(Now, Then, Count, Rate = {OThen, OCount}) ->
+ %% form the avg over the current periond and the previous
+ Avg = 1000000 * ((Count + OCount) / timer:now_diff(Now, OThen)),
+ Rate1 = case 0 == Count of
+ true -> Rate; %% keep the last period with activity
+ false -> {Then, Count}
+ end,
+ {Avg, Rate1}.
+
persistent_msg_ids(Pubs) ->
[MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
Obj #basic_message.is_persistent].
@@ -666,10 +700,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
end.
publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk,
- State = #vqstate { next_seq_id = SeqId, len = Len }) ->
+ State = #vqstate { next_seq_id = SeqId, len = Len,
+ in_counter = InCount }) ->
{SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered,
PersistentMsgsAlreadyOnDisk,
- State #vqstate { next_seq_id = SeqId + 1, len = Len + 1 })}.
+ State #vqstate { next_seq_id = SeqId + 1, len = Len + 1,
+ in_counter = InCount + 1 })}.
publish(msg, Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },