diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-06 18:20:24 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-06 18:20:24 +0000 |
| commit | 98e2b55d875301e5210dbb07dd16264183a2a037 (patch) | |
| tree | abf6cd8e8dd8ba3e8d7fc305d831f847087b522b /src | |
| parent | 4c8dcae91ec74a85783b0793d0bbc8af39134174 (diff) | |
| download | rabbitmq-server-git-98e2b55d875301e5210dbb07dd16264183a2a037.tar.gz | |
Making progress with vq testing and debugging - up to 75% code coverage overall, and ironed out several bugs
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 103 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 108 |
2 files changed, 152 insertions, 59 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d618d3e030..15b9161bad 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -52,6 +52,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> passed = test_msg_store(), passed = test_queue_index(), + passed = test_variable_queue(), passed = test_priority_queue(), passed = test_unfold(), passed = test_parsing(), @@ -1126,38 +1127,104 @@ variable_queue_publish(IsPersistent, Count, VQ) -> {[SeqId | Acc], VQ2} end, {[], VQ}, lists:seq(1, Count)). +variable_queue_fetch(Count, IsPersistent, Len, VQ) -> + lists:foldl(fun (N, {VQN, AckTagsAcc}) -> + Rem = Len - N, + {{_MsgN, IsPersistent, AckTagN, Rem}, VQM} = + rabbit_variable_queue:fetch(VQN), + {VQM, [AckTagN | AckTagsAcc]} + end, {VQ, []}, lists:seq(1, Count)). + +assert_prop(List, Prop, Value) -> + Value = proplists:get_value(Prop, List). + test_variable_queue() -> SegmentSize = rabbit_queue_index:segment_size(), stop_msg_store(), ok = empty_test_queue(), VQ0 = rabbit_variable_queue:init(test_queue()), S0 = rabbit_variable_queue:status(VQ0), - 0 = proplists:get_value(len, S0), - false = proplists:get_value(prefetching, S0), + assert_prop(S0, len, 0), + assert_prop(S0, prefetching, false), VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0), - 0 = proplists:get_value(target_ram_msg_count, - rabbit_variable_queue:status(VQ1)), + assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), {SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1), S2 = rabbit_variable_queue:status(VQ2), - TwoSegments = 2*SegmentSize, - {gamma, SegmentSize, TwoSegments} = proplists:get_value(gamma, S2), - SegmentSize = proplists:get_value(q3, S2), - ThreeSegments = 3*SegmentSize, - ThreeSegments = proplists:get_value(len, S2), + assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}), + assert_prop(S2, q3, SegmentSize), + assert_prop(S2, len, 3*SegmentSize), VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), - io:format("~p~n", [rabbit_variable_queue:status(VQ3)]), - {{Msg, false, AckTag, Len1} = Obj, VQ4} = - rabbit_variable_queue:fetch(VQ3), - io:format("~p~n", [Obj]), + Len1 = 3*SegmentSize - 1, + {{_Msg, false, AckTag, Len1}, VQ4} = rabbit_variable_queue:fetch(VQ3), timer:sleep(1000), VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4), VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5), - io:format("~p~n", [rabbit_variable_queue:status(VQ6)]), - {{Msg1, false, AckTag1, Len11} = Obj1, VQ7} = - rabbit_variable_queue:fetch(VQ6), - io:format("~p~n", [Obj1]), - io:format("~p~n", [rabbit_variable_queue:status(VQ7)]), + timer:sleep(1000), %% let the prefetcher run and grab enough - about 4 msgs + S6 = rabbit_variable_queue:status(VQ6), + RamCount = proplists:get_value(target_ram_msg_count, S6), + assert_prop(S6, prefetching, true), + assert_prop(S6, q4, 0), + assert_prop(S6, q3, (SegmentSize - 1 - RamCount)), + + Len2 = Len1 - 1, + %% this should be enough to stop + drain the prefetcher + {{_Msg1, false, AckTag1, Len2}, VQ7} = rabbit_variable_queue:fetch(VQ6), + S7 = rabbit_variable_queue:status(VQ7), + assert_prop(S7, prefetching, false), + assert_prop(S7, q4, (RamCount - 1)), + assert_prop(S7, q3, (SegmentSize - 1 - RamCount)), + + %% now fetch SegmentSize - 1 which will exhaust q4 and q3, + %% bringing in a segment from gamma: + {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7), + S8 = rabbit_variable_queue:status(VQ8), + assert_prop(S8, prefetching, false), + assert_prop(S8, q4, 0), + assert_prop(S8, q3, (SegmentSize - 1)), + assert_prop(S8, gamma, {gamma, (2*SegmentSize), SegmentSize}), + + VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8), + VQ10 = rabbit_variable_queue:ack(AckTags, VQ9), + + S10 = rabbit_variable_queue:status(VQ10), + assert_prop(S10, prefetching, true), + %% egress rate should be really high, so it's likely if we wait a + %% little bit, the next segment should be brought in + timer:sleep(2000), + Len3 = (2*SegmentSize) - 2, + {{_Msg2, false, AckTag2, Len3}, VQ11} = rabbit_variable_queue:fetch(VQ10), + S11 = rabbit_variable_queue:status(VQ11), + assert_prop(S11, prefetching, false), + assert_prop(S11, q4, (SegmentSize - 2)), + assert_prop(S11, q3, SegmentSize), + assert_prop(S11, gamma, {gamma, undefined, 0}), + assert_prop(S11, q2, 0), + assert_prop(S11, q1, 0), + + VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11), + S12 = rabbit_variable_queue:status(VQ12), + assert_prop(S12, prefetching, true), + PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S12) - + proplists:get_value(ram_msg_count, S12), + SegmentSize]), + timer:sleep(2000), + %% we have to fetch all of q4 before the prefetcher will be drained + {VQ13, AckTags1} = variable_queue_fetch(SegmentSize-2, false, Len3, VQ12), + Len4 = SegmentSize - 1, + {{_Msg3, false, AckTag3, Len4}, VQ14} = rabbit_variable_queue:fetch(VQ13), + S14 = rabbit_variable_queue:status(VQ14), + assert_prop(S14, prefetching, false), + assert_prop(S14, q4, (PrefetchCount - 1)), + assert_prop(S14, q3, (Len4 - (PrefetchCount - 1))), + + VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14), + VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15), + + {VQ17, AckTags2} = variable_queue_fetch(Len4, false, Len4, VQ16), + VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17), + + rabbit_variable_queue:terminate(VQ18), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index af8a4775a4..b967e4a26f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -46,6 +46,7 @@ gamma, q3, q4, + duration_target, target_ram_msg_count, ram_msg_count, queue, @@ -130,6 +131,7 @@ init(QueueName) -> gamma = Gamma, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, + duration_target = undefined, ram_msg_count = 0, queue = QueueName, index_state = IndexState1, @@ -166,12 +168,15 @@ publish_delivered(Msg = #basic_message { guid = MsgId, {ack_not_on_disk, State} end. +set_queue_ram_duration_target(undefined, State) -> + State; set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, target_ram_msg_count = TargetRamMsgCount }) -> TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, + State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, + duration_target = DurationTarget }, if TargetRamMsgCount == TargetRamMsgCount1 -> State1; TargetRamMsgCount == undefined orelse @@ -183,7 +188,8 @@ set_queue_ram_duration_target( remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Timestamp, - out_counter = OutCount }) -> + out_counter = OutCount, + duration_target = DurationTarget }) -> %% We do an average over the last two values, but also hold the %% current value separately so that the average always only %% incorporates the last two values, and not the current value and @@ -192,13 +198,15 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, %% EgressRate is in seconds, and now_diff is in microseconds EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp), AvgEgressRate = (EgressRate + OldEgressRate) / 2, - State #vqstate { egress_rate = EgressRate, - avg_egress_rate = AvgEgressRate, - egress_rate_timestamp = Now, - out_counter = 0 }. + set_queue_ram_duration_target( + DurationTarget, + State #vqstate { egress_rate = EgressRate, + avg_egress_rate = AvgEgressRate, + egress_rate_timestamp = Now, + out_counter = 0 }). fetch(State = - #vqstate { q4 = Q4, + #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, prefetcher = Prefetcher, index_state = IndexState, len = Len }) -> case queue:out(Q4) of @@ -246,6 +254,7 @@ fetch(State = Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, + ram_msg_count = RamMsgCount - 1, index_state = IndexState1, len = Len1 }} end. @@ -258,21 +267,29 @@ is_empty(State) -> maybe_start_prefetcher(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount, - q1 = Q1, q3 = Q3, prefetcher = undefined + q1 = Q1, q3 = Q3, prefetcher = undefined, + gamma = #gamma { count = GammaCount } }) -> - %% prefetched content takes priority over q1 - AvailableSpace = case TargetRamMsgCount of - undefined -> queue:len(Q3); - _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1) - end, - PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), - if PrefetchCount =< 0 -> State; - true -> - {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), - {ok, Prefetcher} = - rabbit_queue_prefetcher:start_link(PrefetchQueue), - maybe_load_next_segment(State #vqstate { q3 = Q3a, - prefetcher = Prefetcher }) + case queue:is_empty(Q3) andalso GammaCount > 0 of + true -> + maybe_start_prefetcher(maybe_load_next_segment(State)); + false -> + %% prefetched content takes priority over q1 + AvailableSpace = + case TargetRamMsgCount of + undefined -> queue:len(Q3); + _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1) + end, + PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), + case PrefetchCount =< 0 of + true -> State; + false -> + {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), + {ok, Prefetcher} = + rabbit_queue_prefetcher:start_link(PrefetchQueue), + maybe_load_next_segment( + State #vqstate { q3 = Q3a, prefetcher = Prefetcher }) + end end; maybe_start_prefetcher(State) -> State. @@ -429,7 +446,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {q2, queue:len(Q2)}, {gamma, Gamma}, {q3, queue:len(Q3)}, - {q4, Q4}, + {q4, queue:len(Q4)}, {len, Len}, {outstanding_txns, length(From)}, {target_ram_msg_count, TargetRamMsgCount}, @@ -570,9 +587,9 @@ publish(neither, Msg = #basic_message { guid = MsgId, State #vqstate { index_state = IndexState1, gamma = combine_gammas(Gamma, Gamma1) }. -fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2, - gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4 }) -> +fetch_from_q3_or_gamma(State = #vqstate { + q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, + q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) -> case queue:out(Q3) of {empty, _Q3} -> 0 = GammaCount, %% ASSERTION @@ -590,7 +607,8 @@ fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2, #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = true, index_on_disk = IndexOnDisk }, Q4), - State1 = State #vqstate { q3 = Q3a, q4 = Q4a }, + State1 = State #vqstate { q3 = Q3a, q4 = Q4a, + ram_msg_count = RamMsgCount + 1 }, State2 = case {queue:is_empty(Q3a), 0 == GammaCount} of {true, true} -> @@ -615,23 +633,31 @@ maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) -> State; maybe_load_next_segment(State = #vqstate { index_state = IndexState, q2 = Q2, + q3 = Q3, gamma = #gamma { seq_id = GammaSeqId, count = GammaCount }}) -> - {List, IndexState1, Gamma1SeqId} = - read_index_segment(GammaSeqId, IndexState), - State1 = State #vqstate { index_state = IndexState1 }, - %% length(List) may be < segment_size because of acks. But it - %% can't be [] - Q3a = betas_from_segment_entries(List), - case GammaCount - length(List) of - 0 -> - %% gamma is now empty, but it wasn't before, so can now - %% join q2 onto q3 - State1 #vqstate { gamma = #gamma { seq_id = undefined, count = 0 }, - q2 = queue:new(), q3 = queue:join(Q3a, Q2) }; - N when N > 0 -> - State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId, - count = N }, q3 = Q3a } + case queue:is_empty(Q3) of + false -> + State; + true -> + {List, IndexState1, Gamma1SeqId} = + read_index_segment(GammaSeqId, IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + %% length(List) may be < segment_size because of acks. But + %% it can't be [] + Q3a = betas_from_segment_entries(List), + case GammaCount - length(List) of + 0 -> + %% gamma is now empty, but it wasn't before, so + %% can now join q2 onto q3 + State1 #vqstate { gamma = #gamma { seq_id = undefined, + count = 0 }, + q2 = queue:new(), + q3 = queue:join(Q3a, Q2) }; + N when N > 0 -> + State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId, + count = N }, q3 = Q3a } + end end. betas_from_segment_entries(List) -> |
