diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-17 11:11:23 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-17 11:11:23 +0000 |
| commit | 79919ede08cc81cc7f1ea4bf9b0b4b4314069532 (patch) | |
| tree | 4a74585200e559697f8695abba0de0f8c016cb6e | |
| parent | 148dc35396c672c12513b5a8269684ce6df06408 (diff) | |
| download | rabbitmq-server-git-79919ede08cc81cc7f1ea4bf9b0b4b4314069532.tar.gz | |
Remove most of the vq tests which are now invalid given the prefetcher has gone.
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 276 |
2 files changed, 9 insertions, 284 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 80fa7edf4f..be15ecbbe1 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -138,14 +138,15 @@ init([]) -> {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), - {ok, #state{timer = TRef, - queue_durations = ets:new(?TABLE_NAME, [set, private]), - queue_duration_sum = 0.0, - queue_duration_count = 0, - memory_limit = MemoryLimit, - memory_ratio = 1.0, - desired_duration = infinity, - callbacks = dict:new()}}. + {ok, internal_update( + #state{timer = TRef, + queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + memory_ratio = 1.0, + desired_duration = infinity, + callbacks = dict:new()})}. handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bfeb397cc1..ac32e2136e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1150,7 +1150,6 @@ fresh_variable_queue() -> VQ = rabbit_variable_queue:init(test_queue()), S0 = rabbit_variable_queue:status(VQ), assert_prop(S0, len, 0), - assert_prop(S0, prefetching, false), assert_prop(S0, q1, 0), assert_prop(S0, q2, 0), assert_prop(S0, gamma, {gamma, undefined, 0}), @@ -1159,10 +1158,6 @@ fresh_variable_queue() -> VQ. test_variable_queue() -> - passed = test_variable_queue_prefetching_and_gammas_to_betas(), - passed = test_variable_queue_prefetching_during_publish(0), - passed = test_variable_queue_prefetching_during_publish(5000), - passed = test_variable_queue_prefetch_evicts_q1(), passed = test_variable_queue_dynamic_duration_change(), passed. @@ -1216,274 +1211,3 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> after 0 -> test_variable_queue_dynamic_duration_change_f(Len, VQ3) end. - -test_variable_queue_prefetch_evicts_q1() -> - SegmentSize = rabbit_queue_index:segment_size(), - VQ0 = fresh_variable_queue(), - VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0), - assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), - Len1 = 2*SegmentSize, - {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1), - %% one segment will be in q3, the other in gamma. We want to fetch - %% all of q3 so that gamma is then moved into q3, emptying gamma - - VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), - Start = now(), - {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3), - End = now(), - VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), - S5 = rabbit_variable_queue:status(VQ5), - assert_prop(S5, q4, 0), - assert_prop(S5, q3, SegmentSize), - assert_prop(S5, gamma, {gamma, undefined, 0}), - assert_prop(S5, len, SegmentSize), - assert_prop(S5, prefetching, false), - - VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5), - %% half the seconds taken to fetch one segment - Duration = timer:now_diff(End, Start) / 2000000, - VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Duration, VQ6), - S7 = rabbit_variable_queue:status(VQ7), - assert_prop(S7, q4, 0), - Q3 = proplists:get_value(q3, S7), - true = Q3 > 0, %% not prefetching everything - assert_prop(S7, gamma, {gamma, undefined, 0}), - assert_prop(S7, len, SegmentSize), - assert_prop(S7, prefetching, true), - - %% now publish a segment, this'll go half in q1, half in q3, in - %% theory. - {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7), - S8 = rabbit_variable_queue:status(VQ8), - assert_prop(S8, q4, 0), - assert_prop(S8, q2, 0), - assert_prop(S8, len, Len1), - assert_prop(S8, prefetching, true), - Q3a = proplists:get_value(q3, S8), - Q3a_new = Q3a - Q3, - Q1a = proplists:get_value(q1, S8), - true = (Q3a_new + Q1a == SegmentSize) andalso Q1a < SegmentSize, - - %% wait a bit, to let the prefetcher do its thing - timer:sleep(2000), - %% fetch a msg. The prefetcher *should* have finished, but can't - %% guarantee it. - Len2 = Len1-1, - {{_Msg, false, AckTag, Len2}, VQ9} = rabbit_variable_queue:fetch(VQ8), - S9 = rabbit_variable_queue:status(VQ9), - case proplists:get_value(prefetching, S9) of - true -> - %% bits of q1 could have moved into q3, and the prefetcher - %% won't have returned any betas for q3. So q3 can not - %% have shrunk. - Q3b = proplists:get_value(q3, S9), - Q1b = proplists:get_value(q1, S9), - true = (Q1a + Q3a) == (Q1b + Q3b) andalso Q3b >= Q3a; - false -> - %% there should be content in q4 and q3 (we only did 1 - %% fetch. This is not sufficient to kill the prefetcher - %% through draining it when it's empty, thus if it's not - %% running, it must have finished, not been killed, thus - %% q4 will not be empty), and q1 should have gone into q3. - Q1b = proplists:get_value(q1, S9), - Q3b = proplists:get_value(q3, S9), - Q4b = proplists:get_value(q4, S9), - NotPrefetched = Q3b - (SegmentSize - Q1b), - SegmentSize = NotPrefetched + Q4b + 1 %% we fetched one - end, - - %% just for the fun of it, set duration to 0. This should push - %% everything back into gamma, except the eldest (partial) segment - %% in q3 - VQ10 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ9), - S10 = rabbit_variable_queue:status(VQ10), - assert_prop(S10, len, Len2), - assert_prop(S10, prefetching, false), - assert_prop(S10, q1, 0), - assert_prop(S10, q2, 0), - assert_prop(S10, gamma, {gamma, Len1, SegmentSize}), - assert_prop(S10, q3, (Len2 - SegmentSize)), - assert_prop(S10, q4, 0), - - {VQ11, AckTags1} = variable_queue_fetch(Len2, true, false, Len2, VQ10), - VQ12 = rabbit_variable_queue:ack([AckTag|AckTags1], VQ11), - {empty, VQ13} = rabbit_variable_queue:fetch(VQ12), - rabbit_variable_queue:terminate(VQ13), - - passed. - -test_variable_queue_prefetching_during_publish(PrefetchDelay) -> - SegmentSize = rabbit_queue_index:segment_size(), - VQ0 = fresh_variable_queue(), - VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0), - assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), - - Len1 = 2*SegmentSize, - {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1), - %% one segment will be in q3, the other in gamma. We want to fetch - %% all of q3 so that gamma is then moved into q3, emptying gamma - - VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), - {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3), - VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), - S5 = rabbit_variable_queue:status(VQ5), - assert_prop(S5, q4, 0), - assert_prop(S5, q3, SegmentSize), - assert_prop(S5, gamma, {gamma, undefined, 0}), - assert_prop(S5, len, SegmentSize), - assert_prop(S5, prefetching, false), - - %% we assume that we can fetch at > 1 msg a second - VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5), - VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Len1, VQ6), - S7 = rabbit_variable_queue:status(VQ7), - assert_prop(S7, q4, 0), - assert_prop(S7, q3, 0), - assert_prop(S7, gamma, {gamma, undefined, 0}), - assert_prop(S7, len, SegmentSize), - assert_prop(S7, prefetching, true), - - timer:sleep(PrefetchDelay), - - {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7), - S8 = rabbit_variable_queue:status(VQ8), - assert_prop(S8, q4, 0), - assert_prop(S8, q2, 0), - assert_prop(S8, q1, SegmentSize), - assert_prop(S8, len, Len1), - assert_prop(S8, prefetching, true), - - {VQ9, AckTags1} = - variable_queue_fetch(SegmentSize-1, true, false, Len1, VQ8), - VQ10 = rabbit_variable_queue:ack(AckTags1, VQ9), - %% can't guarantee the prefetcher has stopped here. If it is still - %% running, then we must have SegmentSize is q1. If it's not - %% running, and it completed, then we'll find SegmentSize + 1 in - %% q4 (q1 will have been joined to q4), otherwise, we'll find - %% SegmentSize in q1 and 1 in q3 and q4 empty. - S10 = rabbit_variable_queue:status(VQ10), - assert_prop(S10, q2, 0), - assert_prop(S10, len, (SegmentSize+1)), - case proplists:get_value(prefetching, S10) of - true -> assert_prop(S10, q1, SegmentSize), - assert_prop(S10, q3, 0), - assert_prop(S10, q4, 0); - false -> case proplists:get_value(q3, S10) of - 0 -> assert_prop(S10, q4, SegmentSize+1), - assert_prop(S10, q1, 0); - 1 -> assert_prop(S10, q4, 0), - assert_prop(S10, q1, SegmentSize) - end - end, - - {VQ11, AckTags2} = - variable_queue_fetch(SegmentSize+1, true, false, SegmentSize+1, VQ10), - VQ12 = rabbit_variable_queue:ack(AckTags2, VQ11), - - {empty, VQ13} = rabbit_variable_queue:fetch(VQ12), - rabbit_variable_queue:terminate(VQ13), - - passed. - -test_variable_queue_prefetching_and_gammas_to_betas() -> - SegmentSize = rabbit_queue_index:segment_size(), - VQ0 = fresh_variable_queue(), - - VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0), - assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), - - {_SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1), - S2 = rabbit_variable_queue:status(VQ2), - assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}), - assert_prop(S2, q3, SegmentSize), - assert_prop(S2, len, 3*SegmentSize), - - VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), - Len1 = 3*SegmentSize - 1, - {{_Msg, false, AckTag, Len1}, VQ4} = rabbit_variable_queue:fetch(VQ3), - timer:sleep(1000), - VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4), - VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5), - timer:sleep(1000), %% let the prefetcher run and grab enough - about 4 msgs - S6 = rabbit_variable_queue:status(VQ6), - RamCount = proplists:get_value(target_ram_msg_count, S6), - assert_prop(S6, prefetching, true), - assert_prop(S6, q4, 0), - assert_prop(S6, q3, (Len1 - RamCount)), - assert_prop(S6, gamma, {gamma, undefined, 0}), - - Len2 = Len1 - 1, - %% this should be enough to stop + drain the prefetcher - {{_Msg1, false, AckTag1, Len2}, VQ7} = rabbit_variable_queue:fetch(VQ6), - S7 = rabbit_variable_queue:status(VQ7), - assert_prop(S7, prefetching, false), - assert_prop(S7, q4, (RamCount - 1)), - assert_prop(S7, q3, (Len1 - RamCount)), - - %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3 - %% bringing in a segment from gamma: - {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, false, Len2, VQ7), - Len3 = Len2 - (SegmentSize - 1), - S8 = rabbit_variable_queue:status(VQ8), - assert_prop(S8, prefetching, false), - assert_prop(S8, q4, 0), - assert_prop(S8, q3, Len3), - assert_prop(S8, len, Len3), - - VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8), - VQ10 = rabbit_variable_queue:ack(AckTags, VQ9), - - S10 = rabbit_variable_queue:status(VQ10), - assert_prop(S10, prefetching, true), - %% egress rate should be really high, so it's likely if we wait a - %% little bit, lots of msgs will be brought in - timer:sleep(2000), - PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S10) - - proplists:get_value(ram_msg_count, S10), - Len3]), - Len4 = Len3 - 1, - {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10), - S11 = rabbit_variable_queue:status(VQ11), - %% prefetcher will stop if it's fast enough and has completed by - %% now, or may still be running if PrefetchCount > 1 - Prefetched = proplists:get_value(q4, S11), - true = PrefetchCount > Prefetched, %% already fetched 1, thus >, not >= - %% q3 will contain whatever the prefetcher was not allowed to - %% prefetch, due to memory constraints. If the prefetcher is still - %% running, this will be less than (Len4 - Prefetched) because - %% Prefetched will not reflect the true number of msgs that it's - %% trying to prefetch. - case proplists:get_value(prefetching, S11) of - true -> true = (Len4 - Prefetched) > proplists:get_value(q3, S11); - false -> assert_prop(S11, q3, Len4 - Prefetched) - end, - assert_prop(S11, gamma, {gamma, undefined, 0}), - assert_prop(S11, q2, 0), - assert_prop(S11, q1, 0), - - S12 = rabbit_variable_queue:status(VQ11), - assert_prop(S12, prefetching, (Len4 - Prefetched) > 0), - timer:sleep(2000), - %% we have to fetch all of q4 before the prefetcher will be drained - {VQ12, AckTags1} = - variable_queue_fetch(Prefetched, false, false, Len4, VQ11), - {VQ15, Acks} = - case Len4 == Prefetched of - true -> - {VQ12, [AckTag2, AckTag1, AckTag, AckTags1]}; - false -> - Len5 = Len4 - Prefetched - 1, - {{_Msg3, false, AckTag3, Len5}, VQ13} = - rabbit_variable_queue:fetch(VQ12), - assert_prop(rabbit_variable_queue:status(VQ13), - prefetching, false), - {VQ14, AckTags2} = - variable_queue_fetch(Len5, false, false, Len5, VQ13), - {VQ14, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]} - end, - VQ16 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ15), - - {empty, VQ17} = rabbit_variable_queue:fetch(VQ16), - - rabbit_variable_queue:terminate(VQ17), - passed. |
