summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-09 09:08:42 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-09 09:08:42 +0000
commite3598c965896854b5f23d1fa42abbb78273fee15 (patch)
tree349a06099dc41d63c8fa8d848f25d4c4846da15e /src
parent98e2b55d875301e5210dbb07dd16264183a2a037 (diff)
downloadrabbitmq-server-git-e3598c965896854b5f23d1fa42abbb78273fee15.tar.gz
modified vq such that when we're bringing entries back in from gamma to beta (q3), we bring all entries in, unless we know that our target_ram count is 0, in which case we only bring one segment in. This massively helps the prefetcher as it means the prefetcher is finally allowed to prefetch more than one segment. However, it has had a negative impact on the tests, which I'm still working through to correct.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl43
-rw-r--r--src/rabbit_variable_queue.erl59
2 files changed, 55 insertions, 47 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 15b9161bad..bef2264c5b 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1167,7 +1167,8 @@ test_variable_queue() ->
RamCount = proplists:get_value(target_ram_msg_count, S6),
assert_prop(S6, prefetching, true),
assert_prop(S6, q4, 0),
- assert_prop(S6, q3, (SegmentSize - 1 - RamCount)),
+ assert_prop(S6, q3, (Len1 - RamCount)),
+ assert_prop(S6, gamma, {gamma, undefined, 0}),
Len2 = Len1 - 1,
%% this should be enough to stop + drain the prefetcher
@@ -1175,16 +1176,17 @@ test_variable_queue() ->
S7 = rabbit_variable_queue:status(VQ7),
assert_prop(S7, prefetching, false),
assert_prop(S7, q4, (RamCount - 1)),
- assert_prop(S7, q3, (SegmentSize - 1 - RamCount)),
+ assert_prop(S7, q3, (Len1 - RamCount)),
- %% now fetch SegmentSize - 1 which will exhaust q4 and q3,
+ %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3
%% bringing in a segment from gamma:
{VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7),
+ Len3 = Len2 - (SegmentSize - 1),
S8 = rabbit_variable_queue:status(VQ8),
assert_prop(S8, prefetching, false),
assert_prop(S8, q4, 0),
- assert_prop(S8, q3, (SegmentSize - 1)),
- assert_prop(S8, gamma, {gamma, (2*SegmentSize), SegmentSize}),
+ assert_prop(S8, q3, Len3),
+ assert_prop(S8, len, Len3),
VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8),
VQ10 = rabbit_variable_queue:ack(AckTags, VQ9),
@@ -1192,38 +1194,39 @@ test_variable_queue() ->
S10 = rabbit_variable_queue:status(VQ10),
assert_prop(S10, prefetching, true),
%% egress rate should be really high, so it's likely if we wait a
- %% little bit, the next segment should be brought in
+ %% little bit, lots of msgs will be brought in
timer:sleep(2000),
- Len3 = (2*SegmentSize) - 2,
- {{_Msg2, false, AckTag2, Len3}, VQ11} = rabbit_variable_queue:fetch(VQ10),
+ PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S10) -
+ proplists:get_value(ram_msg_count, S10),
+ Len3]),
+ Len4 = Len3 - 1,
+ {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10),
S11 = rabbit_variable_queue:status(VQ11),
+ %% prefetcher will stop if it's fast enough and has completed by now, or may still be running if PrefetchCount > 1
assert_prop(S11, prefetching, false),
- assert_prop(S11, q4, (SegmentSize - 2)),
- assert_prop(S11, q3, SegmentSize),
+ Prefetched = proplists:get_value(q4, S11),
+ true = (PrefetchCount - 1) >= Prefetched,
+ assert_prop(S11, q3, Len4 - Prefetched),
assert_prop(S11, gamma, {gamma, undefined, 0}),
assert_prop(S11, q2, 0),
assert_prop(S11, q1, 0),
VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11),
S12 = rabbit_variable_queue:status(VQ12),
- assert_prop(S12, prefetching, true),
- PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S12) -
- proplists:get_value(ram_msg_count, S12),
- SegmentSize]),
+ assert_prop(S12, prefetching, (Len4 - Prefetched) > 0),
timer:sleep(2000),
%% we have to fetch all of q4 before the prefetcher will be drained
- {VQ13, AckTags1} = variable_queue_fetch(SegmentSize-2, false, Len3, VQ12),
- Len4 = SegmentSize - 1,
- {{_Msg3, false, AckTag3, Len4}, VQ14} = rabbit_variable_queue:fetch(VQ13),
+ {VQ13, AckTags1} =
+ variable_queue_fetch(Prefetched, false, Len4, VQ12),
+ Len5 = Len4 - Prefetched - 1,
+ {{_Msg3, false, AckTag3, Len5}, VQ14} = rabbit_variable_queue:fetch(VQ13),
S14 = rabbit_variable_queue:status(VQ14),
assert_prop(S14, prefetching, false),
- assert_prop(S14, q4, (PrefetchCount - 1)),
- assert_prop(S14, q3, (Len4 - (PrefetchCount - 1))),
VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14),
VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15),
- {VQ17, AckTags2} = variable_queue_fetch(Len4, false, Len4, VQ16),
+ {VQ17, AckTags2} = variable_queue_fetch(Len5, false, Len5, VQ16),
VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17),
rabbit_variable_queue:terminate(VQ18),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index b967e4a26f..64c9d19914 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -144,7 +144,7 @@ init(QueueName) ->
len = GammaCount,
on_sync = {[], [], []}
},
- maybe_load_next_segment(State).
+ maybe_gammas_to_betas(State).
terminate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:terminate(IndexState) }.
@@ -264,15 +264,16 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-maybe_start_prefetcher(State = #vqstate {
- ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount,
- q1 = Q1, q3 = Q3, prefetcher = undefined,
- gamma = #gamma { count = GammaCount }
- }) ->
- case queue:is_empty(Q3) andalso GammaCount > 0 of
- true ->
- maybe_start_prefetcher(maybe_load_next_segment(State));
+maybe_start_prefetcher(State = #vqstate { target_ram_msg_count = 0 }) ->
+ State;
+maybe_start_prefetcher(State = #vqstate { prefetcher = undefined }) ->
+ %% ensure we have as much index in RAM as we can
+ State1 = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount,
+ q1 = Q1, q3 = Q3 } = maybe_gammas_to_betas(State),
+ case queue:is_empty(Q3) of
+ true -> %% nothing to do
+ State1;
false ->
%% prefetched content takes priority over q1
AvailableSpace =
@@ -282,13 +283,12 @@ maybe_start_prefetcher(State = #vqstate {
end,
PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
case PrefetchCount =< 0 of
- true -> State;
+ true -> State1;
false ->
{PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
{ok, Prefetcher} =
rabbit_queue_prefetcher:start_link(PrefetchQueue),
- maybe_load_next_segment(
- State #vqstate { q3 = Q3a, prefetcher = Prefetcher })
+ State1 #vqstate { q3 = Q3a, prefetcher = Prefetcher }
end
end;
maybe_start_prefetcher(State) ->
@@ -483,7 +483,7 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
false ->
{Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState),
purge1(Count + Q3Count,
- maybe_load_next_segment(
+ maybe_gammas_to_betas(
State #vqstate { index_state = IndexState1,
q3 = queue:new() }))
end.
@@ -619,7 +619,7 @@ fetch_from_q3_or_gamma(State = #vqstate {
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
- maybe_load_next_segment(State1);
+ maybe_gammas_to_betas(State1);
{false, _} ->
%% q3 still isn't empty, we've not touched
%% gamma, so the invariants between q1, q2,
@@ -629,23 +629,26 @@ fetch_from_q3_or_gamma(State = #vqstate {
fetch(State2)
end.
-maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
+maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
State;
-maybe_load_next_segment(State =
- #vqstate { index_state = IndexState, q2 = Q2,
- q3 = Q3,
- gamma = #gamma { seq_id = GammaSeqId,
- count = GammaCount }}) ->
- case queue:is_empty(Q3) of
- false ->
- State;
+maybe_gammas_to_betas(State =
+ #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
+ target_ram_msg_count = TargetRamMsgCount,
+ gamma = #gamma { seq_id = GammaSeqId,
+ count = GammaCount }}) ->
+ case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of
true ->
+ State;
+ false ->
+ %% either q3 is empty, in which case we load at least one
+ %% segment, or TargetRamMsgCount > 0, meaning we should
+ %% really be holding all the betas in memory.
{List, IndexState1, Gamma1SeqId} =
read_index_segment(GammaSeqId, IndexState),
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3a = betas_from_segment_entries(List),
+ Q3a = queue:join(Q3, betas_from_segment_entries(List)),
case GammaCount - length(List) of
0 ->
%% gamma is now empty, but it wasn't before, so
@@ -655,8 +658,10 @@ maybe_load_next_segment(State =
q2 = queue:new(),
q3 = queue:join(Q3a, Q2) };
N when N > 0 ->
- State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId,
- count = N }, q3 = Q3a }
+ maybe_gammas_to_betas(
+ State1 #vqstate { q3 = Q3a,
+ gamma = #gamma { seq_id = Gamma1SeqId,
+ count = N } })
end
end.