diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-09 13:46:19 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-09 13:46:19 +0000 |
| commit | 5fd1c1493c488631c56d180117c055c2f23ddb7c (patch) | |
| tree | 89d70e1d4208cfa9ea4c9f7d81c1faf27b6614f6 /src | |
| parent | e3598c965896854b5f23d1fa42abbb78273fee15 (diff) | |
| download | rabbitmq-server-git-5fd1c1493c488631c56d180117c055c2f23ddb7c.tar.gz | |
Made the tests work again. Also one tiny cosmetic in vq. However, uncovered major mistake in VQ which is that currently, when the prefetcher starts, that may empty q3. A subsequent publish may then go straight to Q4, thus overtaking the previous msgs. Even worse, when the prefetcher is drained, there is no attempt to join it into the existing q4, it just replaces it. What should happen is that the existence of the prefetcher is treated as if both q3 and q4 are non empty. This makes sense, because there are some ways in which the prefetcher can exit, returning entries for both q4 and q3. Thus pubs that happen after the prefetcher is started must go to q1/q2/γ, and so we know that entries already in q4 when the prefetcher is drained must have got there before the prefetcher was started. Finally, when the prefetcher is drained, if γ is empty, q2 and q3 can be joined, and if q2, γ and q3 are empty, q1 and q4 can be joined. Or something like that.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 2 |
2 files changed, 35 insertions, 19 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index bef2264c5b..13d3cd1b82 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1127,10 +1127,11 @@ variable_queue_publish(IsPersistent, Count, VQ) -> {[SeqId | Acc], VQ2} end, {[], VQ}, lists:seq(1, Count)). -variable_queue_fetch(Count, IsPersistent, Len, VQ) -> +variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> Rem = Len - N, - {{_MsgN, IsPersistent, AckTagN, Rem}, VQM} = + {{#basic_message { is_persistent = IsPersistent }, + IsDelivered, AckTagN, Rem}, VQM} = rabbit_variable_queue:fetch(VQN), {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). @@ -1180,7 +1181,7 @@ test_variable_queue() -> %% now fetch SegmentSize - 1 which will exhaust q4 and work through a bit of q3 %% bringing in a segment from gamma: - {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, Len2, VQ7), + {VQ8, AckTags} = variable_queue_fetch(SegmentSize-1, false, false, Len2, VQ7), Len3 = Len2 - (SegmentSize - 1), S8 = rabbit_variable_queue:status(VQ8), assert_prop(S8, prefetching, false), @@ -1202,11 +1203,19 @@ test_variable_queue() -> Len4 = Len3 - 1, {{_Msg2, false, AckTag2, Len4}, VQ11} = rabbit_variable_queue:fetch(VQ10), S11 = rabbit_variable_queue:status(VQ11), - %% prefetcher will stop if it's fast enough and has completed by now, or may still be running if PrefetchCount > 1 - assert_prop(S11, prefetching, false), + %% prefetcher will stop if it's fast enough and has completed by + %% now, or may still be running if PrefetchCount > 1 Prefetched = proplists:get_value(q4, S11), - true = (PrefetchCount - 1) >= Prefetched, - assert_prop(S11, q3, Len4 - Prefetched), + true = PrefetchCount > Prefetched, %% already fetched 1, thus >, not >= + %% q3 will contain whatever the prefetcher was not allowed to + %% prefetch, due to memory constraints. If the prefetcher is still + %% running, this will be less than (Len4 - Prefetched) because + %% Prefetched will not reflect the true number of msgs that it's + %% trying to prefetch. + case proplists:get_value(prefetching, S11) of + true -> true = (Len4 - Prefetched) > proplists:get_value(q3, S11); + false -> assert_prop(S11, q3, Len4 - Prefetched) + end, assert_prop(S11, gamma, {gamma, undefined, 0}), assert_prop(S11, q2, 0), assert_prop(S11, q1, 0), @@ -1217,17 +1226,24 @@ test_variable_queue() -> timer:sleep(2000), %% we have to fetch all of q4 before the prefetcher will be drained {VQ13, AckTags1} = - variable_queue_fetch(Prefetched, false, Len4, VQ12), - Len5 = Len4 - Prefetched - 1, - {{_Msg3, false, AckTag3, Len5}, VQ14} = rabbit_variable_queue:fetch(VQ13), - S14 = rabbit_variable_queue:status(VQ14), - assert_prop(S14, prefetching, false), - - VQ15 = rabbit_variable_queue:ack([AckTag3, AckTag2, AckTag1, AckTag], VQ14), - VQ16 = rabbit_variable_queue:ack(AckTags1, VQ15), - - {VQ17, AckTags2} = variable_queue_fetch(Len5, false, Len5, VQ16), - VQ18 = rabbit_variable_queue:ack(AckTags2, VQ17), + variable_queue_fetch(Prefetched, false, false, Len4, VQ12), + {VQ16, Acks} = + case Len4 == Prefetched of + true -> + {VQ13, [AckTag2, AckTag1, AckTag, AckTags1]}; + false -> + Len5 = Len4 - Prefetched - 1, + {{_Msg3, false, AckTag3, Len5}, VQ14} = + rabbit_variable_queue:fetch(VQ13), + assert_prop(rabbit_variable_queue:status(VQ14), + prefetching, false), + {VQ15, AckTags2} = + variable_queue_fetch(Len5, false, false, Len5, VQ14), + {VQ15, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]} + end, + VQ17 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ16), + + {empty, VQ18} = rabbit_variable_queue:fetch(VQ17), rabbit_variable_queue:terminate(VQ18), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 64c9d19914..2624a9fb80 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -629,7 +629,7 @@ fetch_from_q3_or_gamma(State = #vqstate { fetch(State2) end. -maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 }} ) -> +maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) -> State; maybe_gammas_to_betas(State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, |
