summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-10 11:15:39 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-10 11:15:39 +0000
commita6a6707c97e74e586bb6fb5ee1e0172350c64a38 (patch)
tree194fae864f69036172b2efeb2b356c8302d0fae2
parent9add76728a35e9d09b1c752e67d57698e0d82d2a (diff)
downloadrabbitmq-server-git-a6a6707c97e74e586bb6fb5ee1e0172350c64a38.tar.gz
Added test which slowly reduces the duration target of the queue as messages are pumped through at high rate. This has revealed major flaw in the queue index which goes as follows. In the queue index, we assume that all segments have the same number of entries. This is in fact not necessarily the case, because a segment may very well have a mixture of messages, some of which are on disk and some of which are not. There are a few solutions to this, and I've not decided yet which is right.
-rw-r--r--src/rabbit_tests.erl41
-rw-r--r--src/rabbit_variable_queue.erl4
2 files changed, 44 insertions, 1 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 1becda86c4..d1131ed0af 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1159,8 +1159,49 @@ test_variable_queue() ->
passed = test_variable_queue_prefetching_during_publish(0),
passed = test_variable_queue_prefetching_during_publish(5000),
passed = test_variable_queue_prefetch_evicts_q1(),
+ passed = test_variable_queue_dynamic_duration_change(),
passed.
+test_variable_queue_dynamic_duration_change() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ VQ0 = fresh_variable_queue(),
+ %% start by sending in a couple of segments worth
+ Len1 = 2*SegmentSize,
+ {_SeqIds, VQ1} = variable_queue_publish(true, Len1, VQ0),
+ VQ2 = rabbit_variable_queue:remeasure_egress_rate(VQ1),
+ {ok, _TRef} = timer:send_after(1000, {duration, 30, fun erlang:'-'/2}),
+ VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2),
+ {VQ4, AckTags} = variable_queue_fetch(Len1, false, false, Len1, VQ3),
+ VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
+ {empty, VQ6} = rabbit_variable_queue:fetch(VQ5),
+ rabbit_variable_queue:terminate(VQ6),
+
+ passed.
+
+test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
+ {_SeqIds, VQ1} = variable_queue_publish(false, 1, VQ0),
+ {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1),
+ VQ3 = rabbit_variable_queue:ack([AckTag], VQ2),
+ receive
+ {duration, 30, stop} ->
+ VQ3;
+ {duration, N, Fun} ->
+ N1 = Fun(N, 1),
+ Fun1 = case N1 of
+ 0 -> fun erlang:'+'/2;
+ 30 -> stop;
+ _ -> Fun
+ end,
+ {ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}),
+ VQ4 = rabbit_variable_queue:remeasure_egress_rate(VQ3),
+ VQ5 = %% /37 otherwise the duration is just to high to stress things
+ rabbit_variable_queue:set_queue_ram_duration_target(N/37, VQ4),
+ io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]),
+ test_variable_queue_dynamic_duration_change_f(Len, VQ5)
+ after 0 ->
+ test_variable_queue_dynamic_duration_change_f(Len, VQ3)
+ end.
+
test_variable_queue_prefetch_evicts_q1() ->
SegmentSize = rabbit_queue_index:segment_size(),
VQ0 = fresh_variable_queue(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c698e31ed3..15caf81bb3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -482,7 +482,8 @@ flush_journal(State = #vqstate { index_state = IndexState }) ->
status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount, prefetcher = Prefetcher }) ->
+ ram_msg_count = RamMsgCount, prefetcher = Prefetcher,
+ avg_egress_rate = AvgEgressRate }) ->
[ {q1, queue:len(Q1)},
{q2, queue:len(Q2)},
{gamma, Gamma},
@@ -492,6 +493,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{outstanding_txns, length(From)},
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
+ {avg_egress_rate, AvgEgressRate},
{prefetching, Prefetcher /= undefined} ].
%%----------------------------------------------------------------------------