summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-17 11:11:23 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-17 11:11:23 +0000
commit79919ede08cc81cc7f1ea4bf9b0b4b4314069532 (patch)
tree4a74585200e559697f8695abba0de0f8c016cb6e
parent148dc35396c672c12513b5a8269684ce6df06408 (diff)
downloadrabbitmq-server-git-79919ede08cc81cc7f1ea4bf9b0b4b4314069532.tar.gz
Remove most of the vq tests which are now invalid given the prefetcher has gone.
-rw-r--r--src/rabbit_memory_monitor.erl17
-rw-r--r--src/rabbit_tests.erl276
2 files changed, 9 insertions, 284 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 80fa7edf4f..be15ecbbe1 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -138,14 +138,15 @@ init([]) ->
{ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
- {ok, #state{timer = TRef,
- queue_durations = ets:new(?TABLE_NAME, [set, private]),
- queue_duration_sum = 0.0,
- queue_duration_count = 0,
- memory_limit = MemoryLimit,
- memory_ratio = 1.0,
- desired_duration = infinity,
- callbacks = dict:new()}}.
+ {ok, internal_update(
+ #state{timer = TRef,
+ queue_durations = ets:new(?TABLE_NAME, [set, private]),
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ memory_ratio = 1.0,
+ desired_duration = infinity,
+ callbacks = dict:new()})}.
handle_call({report_queue_duration, Pid, QueueDuration}, From,
State = #state{queue_duration_sum = Sum,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index bfeb397cc1..ac32e2136e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1150,7 +1150,6 @@ fresh_variable_queue() ->
VQ = rabbit_variable_queue:init(test_queue()),
S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
- assert_prop(S0, prefetching, false),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
assert_prop(S0, gamma, {gamma, undefined, 0}),
@@ -1159,10 +1158,6 @@ fresh_variable_queue() ->
VQ.
test_variable_queue() ->
- passed = test_variable_queue_prefetching_and_gammas_to_betas(),
- passed = test_variable_queue_prefetching_during_publish(0),
- passed = test_variable_queue_prefetching_during_publish(5000),
- passed = test_variable_queue_prefetch_evicts_q1(),
passed = test_variable_queue_dynamic_duration_change(),
passed.
@@ -1216,274 +1211,3 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
after 0 ->
test_variable_queue_dynamic_duration_change_f(Len, VQ3)
end.
-
-test_variable_queue_prefetch_evicts_q1() ->
- SegmentSize = rabbit_queue_index:segment_size(),
- VQ0 = fresh_variable_queue(),
- VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0),
- assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
- Len1 = 2*SegmentSize,
- {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1),
- %% one segment will be in q3, the other in gamma. We want to fetch
- %% all of q3 so that gamma is then moved into q3, emptying gamma
-
- VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
- Start = now(),
- {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3),
- End = now(),
- VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
- S5 = rabbit_variable_queue:status(VQ5),
- assert_prop(S5, q4, 0),
- assert_prop(S5, q3, SegmentSize),
- assert_prop(S5, gamma, {gamma, undefined, 0}),
- assert_prop(S5, len, SegmentSize),
- assert_prop(S5, prefetching, false),
-
- VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5),
- %% half the seconds taken to fetch one segment
- Duration = timer:now_diff(End, Start) / 2000000,
- VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Duration, VQ6),
- S7 = rabbit_variable_queue:status(VQ7),
- assert_prop(S7, q4, 0),
- Q3 = proplists:get_value(q3, S7),
- true = Q3 > 0, %% not prefetching everything
- assert_prop(S7, gamma, {gamma, undefined, 0}),
- assert_prop(S7, len, SegmentSize),
- assert_prop(S7, prefetching, true),
-
- %% now publish a segment, this'll go half in q1, half in q3, in
- %% theory.
- {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7),
- S8 = rabbit_variable_queue:status(VQ8),
- assert_prop(S8, q4, 0),
- assert_prop(S8, q2, 0),
- assert_prop(S8, len, Len1),
- assert_prop(S8, prefetching, true),
- Q3a = proplists:get_value(q3, S8),
- Q3a_new = Q3a - Q3,
- Q1a = proplists:get_value(q1, S8),
- true = (Q3a_new + Q1a == SegmentSize) andalso Q1a < SegmentSize,
-
- %% wait a bit, to let the prefetcher do its thing
- timer:sleep(2000),
- %% fetch a msg. The prefetcher *should* have finished, but can't
- %% guarantee it.
- Len2 = Len1-1,
- {{_Msg, false, AckTag, Len2}, VQ9} = rabbit_variable_queue:fetch(VQ8),
- S9 = rabbit_variable_queue:status(VQ9),
- case proplists:get_value(prefetching, S9) of
- true ->
- %% bits of q1 could have moved into q3, and the prefetcher
- %% won't have returned any betas for q3. So q3 can not
- %% have shrunk.
- Q3b = proplists:get_value(q3, S9),
- Q1b = proplists:get_value(q1, S9),
- true = (Q1a + Q3a) == (Q1b + Q3b) andalso Q3b >= Q3a;
- false ->
- %% there should be content in q4 and q3 (we only did 1
- %% fetch. This is not sufficient to kill the prefetcher
- %% through draining it when it's empty, thus if it's not
- %% running, it must have finished, not been killed, thus
- %% q4 will not be empty), and q1 should have gone into q3.
- Q1b = proplists:get_value(q1, S9),
- Q3b = proplists:get_value(q3, S9),
- Q4b = proplists:get_value(q4, S9),
- NotPrefetched = Q3b - (SegmentSize - Q1b),
- SegmentSize = NotPrefetched + Q4b + 1 %% we fetched one
- end,
-
- %% just for the fun of it, set duration to 0. This should push
- %% everything back into gamma, except the eldest (partial) segment
- %% in q3
- VQ10 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ9),
- S10 = rabbit_variable_queue:status(VQ10),
- assert_prop(S10, len, Len2),
- assert_prop(S10, prefetching, false),
- assert_prop(S10, q1, 0),
- assert_prop(S10, q2, 0),
- assert_prop(S10, gamma, {gamma, Len1, SegmentSize}),
- assert_prop(S10, q3, (Len2 - SegmentSize)),
- assert_prop(S10, q4, 0),
-
- {VQ11, AckTags1} = variable_queue_fetch(Len2, true, false, Len2, VQ10),
- VQ12 = rabbit_variable_queue:ack([AckTag|AckTags1], VQ11),
- {empty, VQ13} = rabbit_variable_queue:fetch(VQ12),
- rabbit_variable_queue:terminate(VQ13),
-
- passed.
-
-test_variable_queue_prefetching_during_publish(PrefetchDelay) ->
- SegmentSize = rabbit_queue_index:segment_size(),
- VQ0 = fresh_variable_queue(),
- VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0),
- assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
-
- Len1 = 2*SegmentSize,
- {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1),
- %% one segment will be in q3, the other in gamma. We want to fetch
- %% all of q3 so that gamma is then moved into q3, emptying gamma
-
- VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
- {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3),
- VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
- S5 = rabbit_variable_queue:status(VQ5),
- assert_prop(S5, q4, 0),
- assert_prop(S5, q3, SegmentSize),
- assert_prop(S5, gamma, {gamma, undefined, 0}),
- assert_prop(S5, len, SegmentSize),
- assert_prop(S5, prefetching, false),
-
- %% we assume that we can fetch at > 1 msg a second
- VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5),
- VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Len1, VQ6),
- S7 = rabbit_variable_queue:status(VQ7),
- assert_prop(S7, q4, 0),
- assert_prop(S7, q3, 0),
- assert_prop(S7, gamma, {gamma, undefined, 0}),
- assert_prop(S7, len, SegmentSize),
- assert_prop(S7, prefetching, true),
-
- timer:sleep(PrefetchDelay),
-
- {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7),
- S8 = rabbit_variable_queue:status(VQ8),
- assert_prop(S8, q4, 0),
- assert_prop(S8, q2, 0),
- assert_prop(S8, q1, SegmentSize),
- assert_prop(S8, len, Len1),
- assert_prop(S8, prefetching, true),
-
- {VQ9, AckTags1} =
- variable_queue_fetch(SegmentSize-1, true, false, Len1, VQ8),
- VQ10 = rabbit_variable_queue:ack(AckTags1, VQ9),
- %% can't guarantee the prefetcher has stopped here. If it is still
- %% running, then we must have SegmentSize is q1. If it's not
- %% running, and it completed, then we'll find SegmentSize + 1 in
- %% q4 (q1 will have been joined to q4), otherwise, we'll find
- %% SegmentSize in q1 and 1 in q3 and q4 empty.
- S10 = rabbit_variable_queue:status(VQ10),
- assert_prop(S10, q2, 0),
- assert_prop(S10, len, (SegmentSize+1)),
- case proplists:get_value(prefetching, S10) of
- true -> assert_prop(S10, q1, SegmentSize),
- assert_prop(S10, q3, 0),
- assert_prop(S10, q4, 0);
- false -> case proplists:get_value(q3, S10) of
- 0 -> assert_prop(S10, q4, SegmentSize+1),
- assert_prop(S10, q1, 0);
- 1 -> assert_prop(S10, q4, 0),
- assert_prop(S10, q1, SegmentSize)
- end
- end,
-
- {VQ11, AckTags2} =
- variable_queue_fetch(SegmentSize+1, true, false, SegmentSize+1, VQ10),
- VQ12 = rabbit_variable_queue:ack(AckTags2, VQ11),
-
- {empty, VQ13} = rabbit_variable_queue:fetch(VQ12),
- rabbit_variable_queue:terminate(VQ13),
-
- passed.
-
-test_variable_queue_prefetching_and_gammas_to_betas() ->
- SegmentSize = rabbit_queue_index:segment_size(),
- VQ0 = fresh_variable_queue(),
-
- VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0),
- assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
-
- {_SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1),
- S2 = rabbit_variable_queue:status(VQ2),
- assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}),
- assert_prop(S2, q3, SegmentSize),
- assert_prop(S2, len, 3*SegmentSize),
-
- VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
- Len1 = 3*SegmentSize - 1,
- {{_Msg, false, AckTag, Len1}, VQ4} = rabbit_variable_queue:fetch(VQ3),
- timer:sleep(1000),
- VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4),
- VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5),
- timer:sleep(1000), %% let the prefetcher run and grab enough - about 4 msgs
- S6 = rabbit_variable_queue:status(VQ6),
- RamCount = proplists:get_value(target_ram_msg_count, S6),
- assert_prop(S6, prefetching, true),
- assert_prop(S6, q4, 0),
- assert_prop(S6, q3, (Len1 - RamCount)),
- assert_prop(S6, gamma, {gamma, undefined, 0}),
-
- Len2 = Len1 - 1,
- %% this should be enough to stop + drain the prefetcher
- {{_Msg1, false, AckTag1, Len2}, VQ7} = rabbit_variable_queue:fetch(VQ6),
- S7 = rabbit_variable_queue:status(VQ7),
- assert_prop(S7, prefetching, false),
- assert_prop(S7, q4, (RamCount - 1)),
- assert_prop(S7, q3, (Len1 - RamCount)),
-
- %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3
- %% bringing in a segment from gamma:
- {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, false, Len2, VQ7),
- Len3 = Len2 - (SegmentSize - 1),
- S8 = rabbit_variable_queue:status(VQ8),
- assert_prop(S8, prefetching, false),
- assert_prop(S8, q4, 0),
- assert_prop(S8, q3, Len3),
- assert_prop(S8, len, Len3),
-
- VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8),
- VQ10 = rabbit_variable_queue:ack(AckTags, VQ9),
-
- S10 = rabbit_variable_queue:status(VQ10),
- assert_prop(S10, prefetching, true),
- %% egress rate should be really high, so it's likely if we wait a
- %% little bit, lots of msgs will be brought in
- timer:sleep(2000),
- PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S10) -
- proplists:get_value(ram_msg_count, S10),
- Len3]),
- Len4 = Len3 - 1,
- {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10),
- S11 = rabbit_variable_queue:status(VQ11),
- %% prefetcher will stop if it's fast enough and has completed by
- %% now, or may still be running if PrefetchCount > 1
- Prefetched = proplists:get_value(q4, S11),
- true = PrefetchCount > Prefetched, %% already fetched 1, thus >, not >=
- %% q3 will contain whatever the prefetcher was not allowed to
- %% prefetch, due to memory constraints. If the prefetcher is still
- %% running, this will be less than (Len4 - Prefetched) because
- %% Prefetched will not reflect the true number of msgs that it's
- %% trying to prefetch.
- case proplists:get_value(prefetching, S11) of
- true -> true = (Len4 - Prefetched) > proplists:get_value(q3, S11);
- false -> assert_prop(S11, q3, Len4 - Prefetched)
- end,
- assert_prop(S11, gamma, {gamma, undefined, 0}),
- assert_prop(S11, q2, 0),
- assert_prop(S11, q1, 0),
-
- S12 = rabbit_variable_queue:status(VQ11),
- assert_prop(S12, prefetching, (Len4 - Prefetched) > 0),
- timer:sleep(2000),
- %% we have to fetch all of q4 before the prefetcher will be drained
- {VQ12, AckTags1} =
- variable_queue_fetch(Prefetched, false, false, Len4, VQ11),
- {VQ15, Acks} =
- case Len4 == Prefetched of
- true ->
- {VQ12, [AckTag2, AckTag1, AckTag, AckTags1]};
- false ->
- Len5 = Len4 - Prefetched - 1,
- {{_Msg3, false, AckTag3, Len5}, VQ13} =
- rabbit_variable_queue:fetch(VQ12),
- assert_prop(rabbit_variable_queue:status(VQ13),
- prefetching, false),
- {VQ14, AckTags2} =
- variable_queue_fetch(Len5, false, false, Len5, VQ13),
- {VQ14, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]}
- end,
- VQ16 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ15),
-
- {empty, VQ17} = rabbit_variable_queue:fetch(VQ16),
-
- rabbit_variable_queue:terminate(VQ17),
- passed.