diff options
| -rw-r--r-- | src/rabbit_tests.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
2 files changed, 55 insertions, 47 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 15b9161bad..bef2264c5b 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1167,7 +1167,8 @@ test_variable_queue() -> RamCount = proplists:get_value(target_ram_msg_count, S6), assert_prop(S6, prefetching, true), assert_prop(S6, q4, 0), - assert_prop(S6, q3, (SegmentSize - 1 - RamCount)), + assert_prop(S6, q3, (Len1 - RamCount)), + assert_prop(S6, gamma, {gamma, undefined, 0}), Len2 = Len1 - 1, %% this should be enough to stop + drain the prefetcher @@ -1175,16 +1176,17 @@ test_variable_queue() -> S7 = rabbit_variable_queue:status(VQ7), assert_prop(S7, prefetching, false), assert_prop(S7, q4, (RamCount - 1)), - assert_prop(S7, q3, (SegmentSize - 1 - RamCount)), + assert_prop(S7, q3, (Len1 - RamCount)), - %% now fetch SegmentSize - 1 which will exhaust q4 and q3, + %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3 %% bringing in a segment from gamma: {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7), + Len3 = Len2 - (SegmentSize - 1), S8 = rabbit_variable_queue:status(VQ8), assert_prop(S8, prefetching, false), assert_prop(S8, q4, 0), - assert_prop(S8, q3, (SegmentSize - 1)), - assert_prop(S8, gamma, {gamma, (2*SegmentSize), SegmentSize}), + assert_prop(S8, q3, Len3), + assert_prop(S8, len, Len3), VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8), VQ10 = rabbit_variable_queue:ack(AckTags, VQ9), @@ -1192,38 +1194,39 @@ test_variable_queue() -> S10 = rabbit_variable_queue:status(VQ10), assert_prop(S10, prefetching, true), %% egress rate should be really high, so it's likely if we wait a - %% little bit, the next segment should be brought in + %% little bit, lots of msgs will be brought in timer:sleep(2000), - Len3 = (2*SegmentSize) - 2, - {{_Msg2, false, AckTag2, Len3}, VQ11} = rabbit_variable_queue:fetch(VQ10), + PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S10) - + proplists:get_value(ram_msg_count, S10), + Len3]), + Len4 = Len3 - 1, + {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10), S11 = rabbit_variable_queue:status(VQ11), + %% prefetcher will stop if it's fast enough and has completed by now, or may still be running if PrefetchCount > 1 assert_prop(S11, prefetching, false), - assert_prop(S11, q4, (SegmentSize - 2)), - assert_prop(S11, q3, SegmentSize), + Prefetched = proplists:get_value(q4, S11), + true = (PrefetchCount - 1) >= Prefetched, + assert_prop(S11, q3, Len4 - Prefetched), assert_prop(S11, gamma, {gamma, undefined, 0}), assert_prop(S11, q2, 0), assert_prop(S11, q1, 0), VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11), S12 = rabbit_variable_queue:status(VQ12), - assert_prop(S12, prefetching, true), - PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S12) - - proplists:get_value(ram_msg_count, S12), - SegmentSize]), + assert_prop(S12, prefetching, (Len4 - Prefetched) > 0), timer:sleep(2000), %% we have to fetch all of q4 before the prefetcher will be drained - {VQ13, AckTags1} = variable_queue_fetch(SegmentSize-2, false, Len3, VQ12), - Len4 = SegmentSize - 1, - {{_Msg3, false, AckTag3, Len4}, VQ14} = rabbit_variable_queue:fetch(VQ13), + {VQ13, AckTags1} = + variable_queue_fetch(Prefetched, false, Len4, VQ12), + Len5 = Len4 - Prefetched - 1, + {{_Msg3, false, AckTag3, Len5}, VQ14} = rabbit_variable_queue:fetch(VQ13), S14 = rabbit_variable_queue:status(VQ14), assert_prop(S14, prefetching, false), - assert_prop(S14, q4, (PrefetchCount - 1)), - assert_prop(S14, q3, (Len4 - (PrefetchCount - 1))), VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14), VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15), - {VQ17, AckTags2} = variable_queue_fetch(Len4, false, Len4, VQ16), + {VQ17, AckTags2} = variable_queue_fetch(Len5, false, Len5, VQ16), VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17), rabbit_variable_queue:terminate(VQ18), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index b967e4a26f..64c9d19914 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -144,7 +144,7 @@ init(QueueName) -> len = GammaCount, on_sync = {[], [], []} }, - maybe_load_next_segment(State). + maybe_gammas_to_betas(State). terminate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }. @@ -264,15 +264,16 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). -maybe_start_prefetcher(State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount, - q1 = Q1, q3 = Q3, prefetcher = undefined, - gamma = #gamma { count = GammaCount } - }) -> - case queue:is_empty(Q3) andalso GammaCount > 0 of - true -> - maybe_start_prefetcher(maybe_load_next_segment(State)); +maybe_start_prefetcher(State = #vqstate { target_ram_msg_count = 0 }) -> + State; +maybe_start_prefetcher(State = #vqstate { prefetcher = undefined }) -> + %% ensure we have as much index in RAM as we can + State1 = #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount, + q1 = Q1, q3 = Q3 } = maybe_gammas_to_betas(State), + case queue:is_empty(Q3) of + true -> %% nothing to do + State1; false -> %% prefetched content takes priority over q1 AvailableSpace = @@ -282,13 +283,12 @@ maybe_start_prefetcher(State = #vqstate { end, PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), case PrefetchCount =< 0 of - true -> State; + true -> State1; false -> {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), {ok, Prefetcher} = rabbit_queue_prefetcher:start_link(PrefetchQueue), - maybe_load_next_segment( - State #vqstate { q3 = Q3a, prefetcher = Prefetcher }) + State1 #vqstate { q3 = Q3a, prefetcher = Prefetcher } end end; maybe_start_prefetcher(State) -> @@ -483,7 +483,7 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> false -> {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), purge1(Count + Q3Count, - maybe_load_next_segment( + maybe_gammas_to_betas( State #vqstate { index_state = IndexState1, q3 = queue:new() })) end. @@ -619,7 +619,7 @@ fetch_from_q3_or_gamma(State = #vqstate { State1 #vqstate { q1 = queue:new(), q4 = queue:join(Q4a, Q1) }; {true, false} -> - maybe_load_next_segment(State1); + maybe_gammas_to_betas(State1); {false, _} -> %% q3 still isn't empty, we've not touched %% gamma, so the invariants between q1, q2, @@ -629,23 +629,26 @@ fetch_from_q3_or_gamma(State = #vqstate { fetch(State2) end. -maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) -> +maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 }} ) -> State; -maybe_load_next_segment(State = - #vqstate { index_state = IndexState, q2 = Q2, - q3 = Q3, - gamma = #gamma { seq_id = GammaSeqId, - count = GammaCount }}) -> - case queue:is_empty(Q3) of - false -> - State; +maybe_gammas_to_betas(State = + #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, + target_ram_msg_count = TargetRamMsgCount, + gamma = #gamma { seq_id = GammaSeqId, + count = GammaCount }}) -> + case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of true -> + State; + false -> + %% either q3 is empty, in which case we load at least one + %% segment, or TargetRamMsgCount > 0, meaning we should + %% really be holding all the betas in memory. {List, IndexState1, Gamma1SeqId} = read_index_segment(GammaSeqId, IndexState), State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3a = betas_from_segment_entries(List), + Q3a = queue:join(Q3, betas_from_segment_entries(List)), case GammaCount - length(List) of 0 -> %% gamma is now empty, but it wasn't before, so @@ -655,8 +658,10 @@ maybe_load_next_segment(State = q2 = queue:new(), q3 = queue:join(Q3a, Q2) }; N when N > 0 -> - State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId, - count = N }, q3 = Q3a } + maybe_gammas_to_betas( + State1 #vqstate { q3 = Q3a, + gamma = #gamma { seq_id = Gamma1SeqId, + count = N } }) end end. |
