summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_tests.erl43
-rw-r--r--src/rabbit_variable_queue.erl59
2 files changed, 55 insertions, 47 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 15b9161bad..bef2264c5b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1167,7 +1167,8 @@ test_variable_queue() ->
RamCount = proplists:get_value(target_ram_msg_count, S6),
assert_prop(S6, prefetching, true),
assert_prop(S6, q4, 0),
- assert_prop(S6, q3, (SegmentSize - 1 - RamCount)),
+ assert_prop(S6, q3, (Len1 - RamCount)),
+ assert_prop(S6, gamma, {gamma, undefined, 0}),
Len2 = Len1 - 1,
%% this should be enough to stop + drain the prefetcher
@@ -1175,16 +1176,17 @@ test_variable_queue() ->
S7 = rabbit_variable_queue:status(VQ7),
assert_prop(S7, prefetching, false),
assert_prop(S7, q4, (RamCount - 1)),
- assert_prop(S7, q3, (SegmentSize - 1 - RamCount)),
+ assert_prop(S7, q3, (Len1 - RamCount)),
- %% now fetch SegmentSize - 1 which will exhaust q4 and q3,
+ %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3
%% bringing in a segment from gamma:
{VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7),
+ Len3 = Len2 - (SegmentSize - 1),
S8 = rabbit_variable_queue:status(VQ8),
assert_prop(S8, prefetching, false),
assert_prop(S8, q4, 0),
- assert_prop(S8, q3, (SegmentSize - 1)),
- assert_prop(S8, gamma, {gamma, (2*SegmentSize), SegmentSize}),
+ assert_prop(S8, q3, Len3),
+ assert_prop(S8, len, Len3),
VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8),
VQ10 = rabbit_variable_queue:ack(AckTags, VQ9),
@@ -1192,38 +1194,39 @@ test_variable_queue() ->
S10 = rabbit_variable_queue:status(VQ10),
assert_prop(S10, prefetching, true),
%% egress rate should be really high, so it's likely if we wait a
- %% little bit, the next segment should be brought in
+ %% little bit, lots of msgs will be brought in
timer:sleep(2000),
- Len3 = (2*SegmentSize) - 2,
- {{_Msg2, false, AckTag2, Len3}, VQ11} = rabbit_variable_queue:fetch(VQ10),
+ PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S10) -
+ proplists:get_value(ram_msg_count, S10),
+ Len3]),
+ Len4 = Len3 - 1,
+ {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10),
S11 = rabbit_variable_queue:status(VQ11),
+ %% prefetcher will stop if it's fast enough and has completed by now, or may still be running if PrefetchCount > 1
assert_prop(S11, prefetching, false),
- assert_prop(S11, q4, (SegmentSize - 2)),
- assert_prop(S11, q3, SegmentSize),
+ Prefetched = proplists:get_value(q4, S11),
+ true = (PrefetchCount - 1) >= Prefetched,
+ assert_prop(S11, q3, Len4 - Prefetched),
assert_prop(S11, gamma, {gamma, undefined, 0}),
assert_prop(S11, q2, 0),
assert_prop(S11, q1, 0),
VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11),
S12 = rabbit_variable_queue:status(VQ12),
- assert_prop(S12, prefetching, true),
- PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S12) -
- proplists:get_value(ram_msg_count, S12),
- SegmentSize]),
+ assert_prop(S12, prefetching, (Len4 - Prefetched) > 0),
timer:sleep(2000),
%% we have to fetch all of q4 before the prefetcher will be drained
- {VQ13, AckTags1} = variable_queue_fetch(SegmentSize-2, false, Len3, VQ12),
- Len4 = SegmentSize - 1,
- {{_Msg3, false, AckTag3, Len4}, VQ14} = rabbit_variable_queue:fetch(VQ13),
+ {VQ13, AckTags1} =
+ variable_queue_fetch(Prefetched, false, Len4, VQ12),
+ Len5 = Len4 - Prefetched - 1,
+ {{_Msg3, false, AckTag3, Len5}, VQ14} = rabbit_variable_queue:fetch(VQ13),
S14 = rabbit_variable_queue:status(VQ14),
assert_prop(S14, prefetching, false),
- assert_prop(S14, q4, (PrefetchCount - 1)),
- assert_prop(S14, q3, (Len4 - (PrefetchCount - 1))),
VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14),
VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15),
- {VQ17, AckTags2} = variable_queue_fetch(Len4, false, Len4, VQ16),
+ {VQ17, AckTags2} = variable_queue_fetch(Len5, false, Len5, VQ16),
VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17),
rabbit_variable_queue:terminate(VQ18),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b967e4a26f..64c9d19914 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -144,7 +144,7 @@ init(QueueName) ->
len = GammaCount,
on_sync = {[], [], []}
},
- maybe_load_next_segment(State).
+ maybe_gammas_to_betas(State).
terminate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
@@ -264,15 +264,16 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-maybe_start_prefetcher(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount,
- q1 = Q1, q3 = Q3, prefetcher = undefined,
- gamma = #gamma { count = GammaCount }
- }) ->
- case queue:is_empty(Q3) andalso GammaCount > 0 of
- true ->
- maybe_start_prefetcher(maybe_load_next_segment(State));
+maybe_start_prefetcher(State = #vqstate { target_ram_msg_count = 0 }) ->
+ State;
+maybe_start_prefetcher(State = #vqstate { prefetcher = undefined }) ->
+ %% ensure we have as much index in RAM as we can
+ State1 = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ q1 = Q1, q3 = Q3 } = maybe_gammas_to_betas(State),
+ case queue:is_empty(Q3) of
+ true -> %% nothing to do
+ State1;
false ->
%% prefetched content takes priority over q1
AvailableSpace =
@@ -282,13 +283,12 @@ maybe_start_prefetcher(State = #vqstate {
end,
PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
case PrefetchCount =< 0 of
- true -> State;
+ true -> State1;
false ->
{PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
{ok, Prefetcher} =
rabbit_queue_prefetcher:start_link(PrefetchQueue),
- maybe_load_next_segment(
- State #vqstate { q3 = Q3a, prefetcher = Prefetcher })
+ State1 #vqstate { q3 = Q3a, prefetcher = Prefetcher }
end
end;
maybe_start_prefetcher(State) ->
@@ -483,7 +483,7 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
false ->
{Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
purge1(Count + Q3Count,
- maybe_load_next_segment(
+ maybe_gammas_to_betas(
State #vqstate { index_state = IndexState1,
q3 = queue:new() }))
end.
@@ -619,7 +619,7 @@ fetch_from_q3_or_gamma(State = #vqstate {
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
- maybe_load_next_segment(State1);
+ maybe_gammas_to_betas(State1);
{false, _} ->
%% q3 still isn't empty, we've not touched
%% gamma, so the invariants between q1, q2,
@@ -629,23 +629,26 @@ fetch_from_q3_or_gamma(State = #vqstate {
fetch(State2)
end.
-maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
+maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
State;
-maybe_load_next_segment(State =
- #vqstate { index_state = IndexState, q2 = Q2,
- q3 = Q3,
- gamma = #gamma { seq_id = GammaSeqId,
- count = GammaCount }}) ->
- case queue:is_empty(Q3) of
- false ->
- State;
+maybe_gammas_to_betas(State =
+ #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
+ target_ram_msg_count = TargetRamMsgCount,
+ gamma = #gamma { seq_id = GammaSeqId,
+ count = GammaCount }}) ->
+ case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of
true ->
+ State;
+ false ->
+ %% either q3 is empty, in which case we load at least one
+ %% segment, or TargetRamMsgCount > 0, meaning we should
+ %% really be holding all the betas in memory.
{List, IndexState1, Gamma1SeqId} =
read_index_segment(GammaSeqId, IndexState),
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3a = betas_from_segment_entries(List),
+ Q3a = queue:join(Q3, betas_from_segment_entries(List)),
case GammaCount - length(List) of
0 ->
%% gamma is now empty, but it wasn't before, so
@@ -655,8 +658,10 @@ maybe_load_next_segment(State =
q2 = queue:new(),
q3 = queue:join(Q3a, Q2) };
N when N > 0 ->
- State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId,
- count = N }, q3 = Q3a }
+ maybe_gammas_to_betas(
+ State1 #vqstate { q3 = Q3a,
+ gamma = #gamma { seq_id = Gamma1SeqId,
+ count = N } })
end
end.