summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-06 18:20:24 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-06 18:20:24 +0000
commit98e2b55d875301e5210dbb07dd16264183a2a037 (patch)
treeabf6cd8e8dd8ba3e8d7fc305d831f847087b522b /src
parent4c8dcae91ec74a85783b0793d0bbc8af39134174 (diff)
downloadrabbitmq-server-git-98e2b55d875301e5210dbb07dd16264183a2a037.tar.gz
Making progress with vq testing and debugging - up to 75% code coverage overall, and ironed out several bugs
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl103
-rw-r--r--src/rabbit_variable_queue.erl108
2 files changed, 152 insertions, 59 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d618d3e030..15b9161bad 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -52,6 +52,7 @@ test_content_prop_roundtrip(Datum, Binary) ->
all_tests() ->
passed = test_msg_store(),
passed = test_queue_index(),
+ passed = test_variable_queue(),
passed = test_priority_queue(),
passed = test_unfold(),
passed = test_parsing(),
@@ -1126,38 +1127,104 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
{[SeqId | Acc], VQ2}
end, {[], VQ}, lists:seq(1, Count)).
+variable_queue_fetch(Count, IsPersistent, Len, VQ) ->
+ lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
+ Rem = Len - N,
+ {{_MsgN, IsPersistent, AckTagN, Rem}, VQM} =
+ rabbit_variable_queue:fetch(VQN),
+ {VQM, [AckTagN | AckTagsAcc]}
+ end, {VQ, []}, lists:seq(1, Count)).
+
+assert_prop(List, Prop, Value) ->
+ Value = proplists:get_value(Prop, List).
+
test_variable_queue() ->
SegmentSize = rabbit_queue_index:segment_size(),
stop_msg_store(),
ok = empty_test_queue(),
VQ0 = rabbit_variable_queue:init(test_queue()),
S0 = rabbit_variable_queue:status(VQ0),
- 0 = proplists:get_value(len, S0),
- false = proplists:get_value(prefetching, S0),
+ assert_prop(S0, len, 0),
+ assert_prop(S0, prefetching, false),
VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0),
- 0 = proplists:get_value(target_ram_msg_count,
- rabbit_variable_queue:status(VQ1)),
+ assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
{SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1),
S2 = rabbit_variable_queue:status(VQ2),
- TwoSegments = 2*SegmentSize,
- {gamma, SegmentSize, TwoSegments} = proplists:get_value(gamma, S2),
- SegmentSize = proplists:get_value(q3, S2),
- ThreeSegments = 3*SegmentSize,
- ThreeSegments = proplists:get_value(len, S2),
+ assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}),
+ assert_prop(S2, q3, SegmentSize),
+ assert_prop(S2, len, 3*SegmentSize),
VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
- io:format("~p~n", [rabbit_variable_queue:status(VQ3)]),
- {{Msg, false, AckTag, Len1} = Obj, VQ4} =
- rabbit_variable_queue:fetch(VQ3),
- io:format("~p~n", [Obj]),
+ Len1 = 3*SegmentSize - 1,
+ {{_Msg, false, AckTag, Len1}, VQ4} = rabbit_variable_queue:fetch(VQ3),
timer:sleep(1000),
VQ5 = rabbit_variable_queue:remeasure_egress_rate(VQ4),
VQ6 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ5),
- io:format("~p~n", [rabbit_variable_queue:status(VQ6)]),
- {{Msg1, false, AckTag1, Len11} = Obj1, VQ7} =
- rabbit_variable_queue:fetch(VQ6),
- io:format("~p~n", [Obj1]),
- io:format("~p~n", [rabbit_variable_queue:status(VQ7)]),
+ timer:sleep(1000), %% let the prefetcher run and grab enough - about 4 msgs
+ S6 = rabbit_variable_queue:status(VQ6),
+ RamCount = proplists:get_value(target_ram_msg_count, S6),
+ assert_prop(S6, prefetching, true),
+ assert_prop(S6, q4, 0),
+ assert_prop(S6, q3, (SegmentSize - 1 - RamCount)),
+
+ Len2 = Len1 - 1,
+ %% this should be enough to stop + drain the prefetcher
+ {{_Msg1, false, AckTag1, Len2}, VQ7} = rabbit_variable_queue:fetch(VQ6),
+ S7 = rabbit_variable_queue:status(VQ7),
+ assert_prop(S7, prefetching, false),
+ assert_prop(S7, q4, (RamCount - 1)),
+ assert_prop(S7, q3, (SegmentSize - 1 - RamCount)),
+
+ %% now fetch SegmentSize - 1 which will exhaust q4 and q3,
+ %% bringing in a segment from gamma:
+ {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7),
+ S8 = rabbit_variable_queue:status(VQ8),
+ assert_prop(S8, prefetching, false),
+ assert_prop(S8, q4, 0),
+ assert_prop(S8, q3, (SegmentSize - 1)),
+ assert_prop(S8, gamma, {gamma, (2*SegmentSize), SegmentSize}),
+
+ VQ9 = rabbit_variable_queue:remeasure_egress_rate(VQ8),
+ VQ10 = rabbit_variable_queue:ack(AckTags, VQ9),
+
+ S10 = rabbit_variable_queue:status(VQ10),
+ assert_prop(S10, prefetching, true),
+ %% egress rate should be really high, so it's likely if we wait a
+ %% little bit, the next segment should be brought in
+ timer:sleep(2000),
+ Len3 = (2*SegmentSize) - 2,
+ {{_Msg2, false, AckTag2, Len3}, VQ11} = rabbit_variable_queue:fetch(VQ10),
+ S11 = rabbit_variable_queue:status(VQ11),
+ assert_prop(S11, prefetching, false),
+ assert_prop(S11, q4, (SegmentSize - 2)),
+ assert_prop(S11, q3, SegmentSize),
+ assert_prop(S11, gamma, {gamma, undefined, 0}),
+ assert_prop(S11, q2, 0),
+ assert_prop(S11, q1, 0),
+
+ VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11),
+ S12 = rabbit_variable_queue:status(VQ12),
+ assert_prop(S12, prefetching, true),
+ PrefetchCount = lists:min([proplists:get_value(target_ram_msg_count, S12) -
+ proplists:get_value(ram_msg_count, S12),
+ SegmentSize]),
+ timer:sleep(2000),
+ %% we have to fetch all of q4 before the prefetcher will be drained
+ {VQ13, AckTags1} = variable_queue_fetch(SegmentSize-2, false, Len3, VQ12),
+ Len4 = SegmentSize - 1,
+ {{_Msg3, false, AckTag3, Len4}, VQ14} = rabbit_variable_queue:fetch(VQ13),
+ S14 = rabbit_variable_queue:status(VQ14),
+ assert_prop(S14, prefetching, false),
+ assert_prop(S14, q4, (PrefetchCount - 1)),
+ assert_prop(S14, q3, (Len4 - (PrefetchCount - 1))),
+
+ VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14),
+ VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15),
+
+ {VQ17, AckTags2} = variable_queue_fetch(Len4, false, Len4, VQ16),
+ VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17),
+
+ rabbit_variable_queue:terminate(VQ18),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index af8a4775a4..b967e4a26f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -46,6 +46,7 @@
gamma,
q3,
q4,
+ duration_target,
target_ram_msg_count,
ram_msg_count,
queue,
@@ -130,6 +131,7 @@ init(QueueName) ->
gamma = Gamma,
q3 = queue:new(), q4 = queue:new(),
target_ram_msg_count = undefined,
+ duration_target = undefined,
ram_msg_count = 0,
queue = QueueName,
index_state = IndexState1,
@@ -166,12 +168,15 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
{ack_not_on_disk, State}
end.
+set_queue_ram_duration_target(undefined, State) ->
+ State;
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
target_ram_msg_count = TargetRamMsgCount
}) ->
TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec
- State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 },
+ State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
+ duration_target = DurationTarget },
if TargetRamMsgCount == TargetRamMsgCount1 ->
State1;
TargetRamMsgCount == undefined orelse
@@ -183,7 +188,8 @@ set_queue_ram_duration_target(
remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Timestamp,
- out_counter = OutCount }) ->
+ out_counter = OutCount,
+ duration_target = DurationTarget }) ->
%% We do an average over the last two values, but also hold the
%% current value separately so that the average always only
%% incorporates the last two values, and not the current value and
@@ -192,13 +198,15 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
%% EgressRate is in seconds, and now_diff is in microseconds
EgressRate = 1000000 * OutCount / timer:now_diff(Now, Timestamp),
AvgEgressRate = (EgressRate + OldEgressRate) / 2,
- State #vqstate { egress_rate = EgressRate,
- avg_egress_rate = AvgEgressRate,
- egress_rate_timestamp = Now,
- out_counter = 0 }.
+ set_queue_ram_duration_target(
+ DurationTarget,
+ State #vqstate { egress_rate = EgressRate,
+ avg_egress_rate = AvgEgressRate,
+ egress_rate_timestamp = Now,
+ out_counter = 0 }).
fetch(State =
- #vqstate { q4 = Q4,
+ #vqstate { q4 = Q4, ram_msg_count = RamMsgCount,
out_counter = OutCount, prefetcher = Prefetcher,
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
@@ -246,6 +254,7 @@ fetch(State =
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
+ ram_msg_count = RamMsgCount - 1,
index_state = IndexState1, len = Len1 }}
end.
@@ -258,21 +267,29 @@ is_empty(State) ->
maybe_start_prefetcher(State = #vqstate {
ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount,
- q1 = Q1, q3 = Q3, prefetcher = undefined
+ q1 = Q1, q3 = Q3, prefetcher = undefined,
+ gamma = #gamma { count = GammaCount }
}) ->
- %% prefetched content takes priority over q1
- AvailableSpace = case TargetRamMsgCount of
- undefined -> queue:len(Q3);
- _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1)
- end,
- PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
- if PrefetchCount =< 0 -> State;
- true ->
- {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
- {ok, Prefetcher} =
- rabbit_queue_prefetcher:start_link(PrefetchQueue),
- maybe_load_next_segment(State #vqstate { q3 = Q3a,
- prefetcher = Prefetcher })
+ case queue:is_empty(Q3) andalso GammaCount > 0 of
+ true ->
+ maybe_start_prefetcher(maybe_load_next_segment(State));
+ false ->
+ %% prefetched content takes priority over q1
+ AvailableSpace =
+ case TargetRamMsgCount of
+ undefined -> queue:len(Q3);
+ _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1)
+ end,
+ PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
+ case PrefetchCount =< 0 of
+ true -> State;
+ false ->
+ {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
+ {ok, Prefetcher} =
+ rabbit_queue_prefetcher:start_link(PrefetchQueue),
+ maybe_load_next_segment(
+ State #vqstate { q3 = Q3a, prefetcher = Prefetcher })
+ end
end;
maybe_start_prefetcher(State) ->
State.
@@ -429,7 +446,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{q2, queue:len(Q2)},
{gamma, Gamma},
{q3, queue:len(Q3)},
- {q4, Q4},
+ {q4, queue:len(Q4)},
{len, Len},
{outstanding_txns, length(From)},
{target_ram_msg_count, TargetRamMsgCount},
@@ -570,9 +587,9 @@ publish(neither, Msg = #basic_message { guid = MsgId,
State #vqstate { index_state = IndexState1,
gamma = combine_gammas(Gamma, Gamma1) }.
-fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2,
- gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4 }) ->
+fetch_from_q3_or_gamma(State = #vqstate {
+ q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount },
+ q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) ->
case queue:out(Q3) of
{empty, _Q3} ->
0 = GammaCount, %% ASSERTION
@@ -590,7 +607,8 @@ fetch_from_q3_or_gamma(State = #vqstate { q1 = Q1, q2 = Q2,
#alpha { msg = Msg, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = true,
index_on_disk = IndexOnDisk }, Q4),
- State1 = State #vqstate { q3 = Q3a, q4 = Q4a },
+ State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
+ ram_msg_count = RamMsgCount + 1 },
State2 =
case {queue:is_empty(Q3a), 0 == GammaCount} of
{true, true} ->
@@ -615,23 +633,31 @@ maybe_load_next_segment(State = #vqstate { gamma = #gamma { count = 0 }} ) ->
State;
maybe_load_next_segment(State =
#vqstate { index_state = IndexState, q2 = Q2,
+ q3 = Q3,
gamma = #gamma { seq_id = GammaSeqId,
count = GammaCount }}) ->
- {List, IndexState1, Gamma1SeqId} =
- read_index_segment(GammaSeqId, IndexState),
- State1 = State #vqstate { index_state = IndexState1 },
- %% length(List) may be < segment_size because of acks. But it
- %% can't be []
- Q3a = betas_from_segment_entries(List),
- case GammaCount - length(List) of
- 0 ->
- %% gamma is now empty, but it wasn't before, so can now
- %% join q2 onto q3
- State1 #vqstate { gamma = #gamma { seq_id = undefined, count = 0 },
- q2 = queue:new(), q3 = queue:join(Q3a, Q2) };
- N when N > 0 ->
- State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId,
- count = N }, q3 = Q3a }
+ case queue:is_empty(Q3) of
+ false ->
+ State;
+ true ->
+ {List, IndexState1, Gamma1SeqId} =
+ read_index_segment(GammaSeqId, IndexState),
+ State1 = State #vqstate { index_state = IndexState1 },
+ %% length(List) may be < segment_size because of acks. But
+ %% it can't be []
+ Q3a = betas_from_segment_entries(List),
+ case GammaCount - length(List) of
+ 0 ->
+ %% gamma is now empty, but it wasn't before, so
+ %% can now join q2 onto q3
+ State1 #vqstate { gamma = #gamma { seq_id = undefined,
+ count = 0 },
+ q2 = queue:new(),
+ q3 = queue:join(Q3a, Q2) };
+ N when N > 0 ->
+ State1 #vqstate { gamma = #gamma { seq_id = Gamma1SeqId,
+ count = N }, q3 = Q3a }
+ end
end.
betas_from_segment_entries(List) ->