diff options
| -rw-r--r-- | include/rabbit_queue.hrl | 51 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 194 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 53 |
4 files changed, 264 insertions, 51 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl new file mode 100644 index 0000000000..5833b05683 --- /dev/null +++ b/include/rabbit_queue.hrl @@ -0,0 +1,51 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-record(alpha, + { msg, + seq_id, + is_delivered, + msg_on_disk, + index_on_disk + }). + +-record(beta, + { msg_id, + seq_id, + is_persistent, + is_delivered, + index_on_disk + }). + +-record(gamma, + { seq_id, + count + }). diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index fd407c9dd1..f5e717f55b 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -41,6 +41,7 @@ -export([publish/2, drain/1, drain_and_stop/1, stop/1]). -include("rabbit.hrl"). +-include("rabbit_queue.hrl"). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -52,22 +53,6 @@ peruse_cb }). --record(alpha, - { msg, - seq_id, - is_delivered, - msg_on_disk, - index_on_disk - }). - --record(beta, - { msg_id, - seq_id, - is_persistent, - is_delivered, - index_on_disk - }). - %%---------------------------------------------------------------------------- %% Novel %%---------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 13d3cd1b82..6414347632 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1139,19 +1139,203 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> assert_prop(List, Prop, Value) -> Value = proplists:get_value(Prop, List). -test_variable_queue() -> - SegmentSize = rabbit_queue_index:segment_size(), +fresh_variable_queue() -> stop_msg_store(), ok = empty_test_queue(), - VQ0 = rabbit_variable_queue:init(test_queue()), - S0 = rabbit_variable_queue:status(VQ0), + VQ = rabbit_variable_queue:init(test_queue()), + S0 = rabbit_variable_queue:status(VQ), assert_prop(S0, len, 0), assert_prop(S0, prefetching, false), + assert_prop(S0, q1, 0), + assert_prop(S0, q2, 0), + assert_prop(S0, gamma, {gamma, undefined, 0}), + assert_prop(S0, q3, 0), + assert_prop(S0, q4, 0), + VQ. + +test_variable_queue() -> + passed = test_variable_queue_prefetching_and_gammas_to_betas(), + passed = test_variable_queue_prefetching_during_publish(0), + passed = test_variable_queue_prefetching_during_publish(5000), + passed = test_variable_queue_prefetch_evicts_q1(), + passed. + +test_variable_queue_prefetch_evicts_q1() -> + SegmentSize = rabbit_queue_index:segment_size(), + VQ0 = fresh_variable_queue(), + VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0), + assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), + Len1 = 2*SegmentSize, + {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1), + %% one segment will be in q3, the other in gamma. We want to fetch + %% all of q3 so that gamma is then moved into q3, emptying gamma + + VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), + Start = now(), + {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3), + End = now(), + VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), + S5 = rabbit_variable_queue:status(VQ5), + assert_prop(S5, q4, 0), + assert_prop(S5, q3, SegmentSize), + assert_prop(S5, gamma, {gamma, undefined, 0}), + assert_prop(S5, len, SegmentSize), + assert_prop(S5, prefetching, false), + + VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5), + %% half the seconds taken to fetch one segment + Duration = timer:now_diff(End, Start) / 2000000, + VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Duration, VQ6), + S7 = rabbit_variable_queue:status(VQ7), + assert_prop(S7, q4, 0), + Q3 = proplists:get_value(q3, S7), + true = Q3 > 0, %% not prefetching everything + assert_prop(S7, gamma, {gamma, undefined, 0}), + assert_prop(S7, len, SegmentSize), + assert_prop(S7, prefetching, true), + + %% now publish a segment, this'll go half in q1, half in q3, in + %% theory. + {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7), + S8 = rabbit_variable_queue:status(VQ8), + assert_prop(S8, q4, 0), + assert_prop(S8, q2, 0), + assert_prop(S8, len, Len1), + assert_prop(S8, prefetching, true), + Q3a = proplists:get_value(q3, S8), + Q3a_new = Q3a - Q3, + Q1a = proplists:get_value(q1, S8), + true = (Q3a_new + Q1a == SegmentSize) andalso Q1a < SegmentSize, + + %% wait a bit, to let the prefetcher do its thing + timer:sleep(2000), + %% fetch a msg. The prefetcher *should* have finished, but can't + %% guarantee it. + Len2 = Len1-1, + {{_Msg, false, AckTag, Len2}, VQ9} = rabbit_variable_queue:fetch(VQ8), + S9 = rabbit_variable_queue:status(VQ9), + case proplists:get_value(prefetching, S9) of + true -> + %% bits of q1 could have moved into q3, and the prefetcher + %% won't have returned any betas for q3. So q3 can not + %% have shrunk. + Q3b = proplists:get_value(q3, S9), + Q1b = proplists:get_value(q1, S9), + true = (Q1a + Q3a) == (Q1b + Q3b) andalso Q3b >= Q3a; + false -> + %% there should be content in q4 and q3 (we only did 1 + %% fetch. This is not sufficient to kill the prefetcher + %% through draining it when it's empty, thus if it's not + %% running, it must have finished, not been killed, thus + %% q4 will not be empty), and q1 should have gone into q3. + Q1b = proplists:get_value(q1, S9), + Q3b = proplists:get_value(q3, S9), + Q4b = proplists:get_value(q4, S9), + NotPrefetched = Q3b - (SegmentSize - Q1b), + SegmentSize = NotPrefetched + Q4b + 1 %% we fetched one + end, + + %% just for the fun of it, set duration to 0. This should push + %% everything back into gamma, except the eldest (partial) segment + %% in q3 + VQ10 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ9), + S10 = rabbit_variable_queue:status(VQ10), + assert_prop(S10, len, Len2), + assert_prop(S10, prefetching, false), + assert_prop(S10, q1, 0), + assert_prop(S10, q2, 0), + assert_prop(S10, gamma, {gamma, Len1, SegmentSize}), + assert_prop(S10, q3, (Len2 - SegmentSize)), + assert_prop(S10, q4, 0), + + {VQ11, AckTags1} = variable_queue_fetch(Len2, true, false, Len2, VQ10), + VQ12 = rabbit_variable_queue:ack([AckTag|AckTags1], VQ11), + {empty, VQ13} = rabbit_variable_queue:fetch(VQ12), + rabbit_variable_queue:terminate(VQ13), + + passed. + +test_variable_queue_prefetching_during_publish(PrefetchDelay) -> + SegmentSize = rabbit_queue_index:segment_size(), + VQ0 = fresh_variable_queue(), + VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0), + assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), + + Len1 = 2*SegmentSize, + {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1), + %% one segment will be in q3, the other in gamma. We want to fetch + %% all of q3 so that gamma is then moved into q3, emptying gamma + + VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2), + {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3), + VQ5 = rabbit_variable_queue:ack(AckTags, VQ4), + S5 = rabbit_variable_queue:status(VQ5), + assert_prop(S5, q4, 0), + assert_prop(S5, q3, SegmentSize), + assert_prop(S5, gamma, {gamma, undefined, 0}), + assert_prop(S5, len, SegmentSize), + assert_prop(S5, prefetching, false), + + %% we assume that we can fetch at > 1 msg a second + VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5), + VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Len1, VQ6), + S7 = rabbit_variable_queue:status(VQ7), + assert_prop(S7, q4, 0), + assert_prop(S7, q3, 0), + assert_prop(S7, gamma, {gamma, undefined, 0}), + assert_prop(S7, len, SegmentSize), + assert_prop(S7, prefetching, true), + + timer:sleep(PrefetchDelay), + + {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7), + S8 = rabbit_variable_queue:status(VQ8), + assert_prop(S8, q4, 0), + assert_prop(S8, q2, 0), + assert_prop(S8, q1, SegmentSize), + assert_prop(S8, len, Len1), + assert_prop(S8, prefetching, true), + + {VQ9, AckTags1} = + variable_queue_fetch(SegmentSize-1, true, false, Len1, VQ8), + VQ10 = rabbit_variable_queue:ack(AckTags1, VQ9), + %% can't guarantee the prefetcher has stopped here. If it is still + %% running, then we must have SegmentSize is q1. If it's not + %% running, and it completed, then we'll find SegmentSize + 1 in + %% q4 (q1 will have been joined to q4), otherwise, we'll find + %% SegmentSize in q1 and 1 in q3 and q4 empty. + S10 = rabbit_variable_queue:status(VQ10), + assert_prop(S10, q2, 0), + assert_prop(S10, len, (SegmentSize+1)), + case proplists:get_value(prefetching, S10) of + true -> assert_prop(S10, q1, SegmentSize), + assert_prop(S10, q3, 0), + assert_prop(S10, q4, 0); + false -> case proplists:get_value(q3, S10) of + 0 -> assert_prop(S10, q4, SegmentSize+1), + assert_prop(S10, q1, 0); + 1 -> assert_prop(S10, q4, 0), + assert_prop(S10, q1, SegmentSize) + end + end, + + {VQ11, AckTags2} = + variable_queue_fetch(SegmentSize+1, true, false, SegmentSize+1, VQ10), + VQ12 = rabbit_variable_queue:ack(AckTags2, VQ11), + + {empty, VQ13} = rabbit_variable_queue:fetch(VQ12), + rabbit_variable_queue:terminate(VQ13), + + passed. + +test_variable_queue_prefetching_and_gammas_to_betas() -> + SegmentSize = rabbit_queue_index:segment_size(), + VQ0 = fresh_variable_queue(), VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0), assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0), - {SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1), + {_SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1), S2 = rabbit_variable_queue:status(VQ2), assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}), assert_prop(S2, q3, SegmentSize), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2624a9fb80..d7b9dafbff 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -61,28 +61,8 @@ on_sync }). --record(alpha, - { msg, - seq_id, - is_delivered, - msg_on_disk, - index_on_disk - }). - --record(beta, - { msg_id, - seq_id, - is_persistent, - is_delivered, - index_on_disk - }). - --record(gamma, - { seq_id, - count - }). - -include("rabbit.hrl"). +-include("rabbit_queue.hrl"). %%---------------------------------------------------------------------------- @@ -682,7 +662,9 @@ read_index_segment(SeqId, IndexState) -> drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) -> State; drain_prefetcher(DrainOrStop, - State = #vqstate { prefetcher = Prefetcher, q3 = Q3, q4 = Q4, + State = #vqstate { prefetcher = Prefetcher, q1 = Q1, q2 = Q2, + gamma = #gamma { count = GammaCount }, + q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount }) -> Fun = case DrainOrStop of drain -> fun rabbit_queue_prefetcher:drain/1; @@ -693,16 +675,27 @@ drain_prefetcher(DrainOrStop, {empty, Betas} -> %% drain or drain_and_stop {queue:join(Betas, Q3), Q4, undefined, 0}; {finished, Alphas} -> %% just drain - {Q3, Alphas, undefined, queue:len(Alphas)}; + {Q3, queue:join(Q4, Alphas), undefined, queue:len(Alphas)}; {continuing, Alphas} -> %% just drain - {Q3, Alphas, Prefetcher, queue:len(Alphas)}; + {Q3, queue:join(Q4, Alphas), Prefetcher, queue:len(Alphas)}; {Alphas, Betas} -> %% just drain_and_stop {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined, queue:len(Alphas)} end, - maybe_push_q1_to_betas( - State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a, - ram_msg_count = RamMsgCount + RamMsgCountAdj }). + State1 = State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a, + ram_msg_count = RamMsgCount + RamMsgCountAdj }, + %% don't join up with q1/q2 unless the prefetcher has stopped + State2 = case GammaCount == 0 andalso Prefetcher1 == undefined of + true -> case queue:is_empty(Q3a) andalso queue:is_empty(Q2) of + true -> + State1 #vqstate { q1 = queue:new(), + q4 = queue:join(Q4a, Q1) }; + false -> + State1 #vqstate { q3 = queue:join(Q3a, Q2) } + end; + false -> State1 + end, + maybe_push_q1_to_betas(State2). reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) @@ -796,9 +789,9 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> store_alpha_entry(Entry = #alpha {}, State = #vqstate { q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4 }) -> - case queue:is_empty(Q2) andalso GammaCount == 0 andalso queue:is_empty(Q3) - of + q3 = Q3, q4 = Q4, prefetcher = Prefetcher }) -> + case queue:is_empty(Q2) andalso GammaCount == 0 andalso + queue:is_empty(Q3) andalso Prefetcher == undefined of true -> State #vqstate { q4 = queue:in(Entry, Q4) }; false -> |
