diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-10 11:15:39 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-10 11:15:39 +0000 |
| commit | a6a6707c97e74e586bb6fb5ee1e0172350c64a38 (patch) | |
| tree | 194fae864f69036172b2efeb2b356c8302d0fae2 | |
| parent | 9add76728a35e9d09b1c752e67d57698e0d82d2a (diff) | |
| download | rabbitmq-server-git-a6a6707c97e74e586bb6fb5ee1e0172350c64a38.tar.gz | |
Added test which slowly reduces the duration target of the queue as messages are pumped through at high rate. This has revealed major flaw in the queue index which goes as follows. In the queue index, we assume that all segments have the same number of entries. This is in fact not necessarily the case, because a segment may very well have a mixture of messages, some of which are on disk and some of which are not. There are a few solutions to this, and I've not decided yet which is right.
| -rw-r--r-- | src/rabbit_tests.erl | 41 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 4 |
2 files changed, 44 insertions, 1 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1becda86c4..d1131ed0af 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1159,8 +1159,49 @@ test_variable_queue() -> passed = test_variable_queue_prefetching_during_publish(0), passed = test_variable_queue_prefetching_during_publish(5000), passed = test_variable_queue_prefetch_evicts_q1(), + passed = test_variable_queue_dynamic_duration_change(), passed. +test_variable_queue_dynamic_duration_change() -> + SegmentSize = rabbit_queue_index:segment_size(), + VQ0 = fresh_variable_queue(), + %% start by sending in a couple of segments worth + Len1 = 2*SegmentSize, + {_SeqIds, VQ1} = variable_queue_publish(true, Len1, VQ0), + VQ2 = rabbit_variable_queue:remeasure_egress_rate(VQ1), + {ok, _TRef} = timer:send_after(1000, {duration, 30, fun erlang:'-'/2}), + VQ3 = test_variable_queue_dynamic_duration_change_f(Len1, VQ2), + {VQ4, AckTags} = variable_queue_fetch(Len1, false, false, Len1, VQ3), + VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), + {empty, VQ6} = rabbit_variable_queue:fetch(VQ5), + rabbit_variable_queue:terminate(VQ6), + + passed. + +test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> + {_SeqIds, VQ1} = variable_queue_publish(false, 1, VQ0), + {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(VQ1), + VQ3 = rabbit_variable_queue:ack([AckTag], VQ2), + receive + {duration, 30, stop} -> + VQ3; + {duration, N, Fun} -> + N1 = Fun(N, 1), + Fun1 = case N1 of + 0 -> fun erlang:'+'/2; + 30 -> stop; + _ -> Fun + end, + {ok, _TRef} = timer:send_after(1000, {duration, N1, Fun1}), + VQ4 = rabbit_variable_queue:remeasure_egress_rate(VQ3), + VQ5 = %% /37 otherwise the duration is just to high to stress things + rabbit_variable_queue:set_queue_ram_duration_target(N/37, VQ4), + io:format("~p:~n~p~n~n", [N, rabbit_variable_queue:status(VQ5)]), + test_variable_queue_dynamic_duration_change_f(Len, VQ5) + after 0 -> + test_variable_queue_dynamic_duration_change_f(Len, VQ3) + end. + test_variable_queue_prefetch_evicts_q1() -> SegmentSize = rabbit_queue_index:segment_size(), VQ0 = fresh_variable_queue(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c698e31ed3..15caf81bb3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -482,7 +482,8 @@ flush_journal(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, prefetcher = Prefetcher }) -> + ram_msg_count = RamMsgCount, prefetcher = Prefetcher, + avg_egress_rate = AvgEgressRate }) -> [ {q1, queue:len(Q1)}, {q2, queue:len(Q2)}, {gamma, Gamma}, @@ -492,6 +493,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {outstanding_txns, length(From)}, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, + {avg_egress_rate, AvgEgressRate}, {prefetching, Prefetcher /= undefined} ]. %%---------------------------------------------------------------------------- |
