summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_queue.hrl51
-rw-r--r--src/rabbit_queue_prefetcher.erl17
-rw-r--r--src/rabbit_tests.erl194
-rw-r--r--src/rabbit_variable_queue.erl53
4 files changed, 264 insertions, 51 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl
new file mode 100644
index 0000000000..5833b05683
--- /dev/null
+++ b/include/rabbit_queue.hrl
@@ -0,0 +1,51 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-record(alpha,
+ { msg,
+ seq_id,
+ is_delivered,
+ msg_on_disk,
+ index_on_disk
+ }).
+
+-record(beta,
+ { msg_id,
+ seq_id,
+ is_persistent,
+ is_delivered,
+ index_on_disk
+ }).
+
+-record(gamma,
+ { seq_id,
+ count
+ }).
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index fd407c9dd1..f5e717f55b 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -41,6 +41,7 @@
-export([publish/2, drain/1, drain_and_stop/1, stop/1]).
-include("rabbit.hrl").
+-include("rabbit_queue.hrl").
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -52,22 +53,6 @@
peruse_cb
}).
--record(alpha,
- { msg,
- seq_id,
- is_delivered,
- msg_on_disk,
- index_on_disk
- }).
-
--record(beta,
- { msg_id,
- seq_id,
- is_persistent,
- is_delivered,
- index_on_disk
- }).
-
%%----------------------------------------------------------------------------
%% Novel
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 13d3cd1b82..6414347632 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1139,19 +1139,203 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
assert_prop(List, Prop, Value) ->
Value = proplists:get_value(Prop, List).
-test_variable_queue() ->
- SegmentSize = rabbit_queue_index:segment_size(),
+fresh_variable_queue() ->
stop_msg_store(),
ok = empty_test_queue(),
- VQ0 = rabbit_variable_queue:init(test_queue()),
- S0 = rabbit_variable_queue:status(VQ0),
+ VQ = rabbit_variable_queue:init(test_queue()),
+ S0 = rabbit_variable_queue:status(VQ),
assert_prop(S0, len, 0),
assert_prop(S0, prefetching, false),
+ assert_prop(S0, q1, 0),
+ assert_prop(S0, q2, 0),
+ assert_prop(S0, gamma, {gamma, undefined, 0}),
+ assert_prop(S0, q3, 0),
+ assert_prop(S0, q4, 0),
+ VQ.
+
+test_variable_queue() ->
+ passed = test_variable_queue_prefetching_and_gammas_to_betas(),
+ passed = test_variable_queue_prefetching_during_publish(0),
+ passed = test_variable_queue_prefetching_during_publish(5000),
+ passed = test_variable_queue_prefetch_evicts_q1(),
+ passed.
+
+test_variable_queue_prefetch_evicts_q1() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ VQ0 = fresh_variable_queue(),
+ VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0),
+ assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
+ Len1 = 2*SegmentSize,
+ {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1),
+ %% one segment will be in q3, the other in gamma. We want to fetch
+ %% all of q3 so that gamma is then moved into q3, emptying gamma
+
+ VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
+ Start = now(),
+ {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3),
+ End = now(),
+ VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
+ S5 = rabbit_variable_queue:status(VQ5),
+ assert_prop(S5, q4, 0),
+ assert_prop(S5, q3, SegmentSize),
+ assert_prop(S5, gamma, {gamma, undefined, 0}),
+ assert_prop(S5, len, SegmentSize),
+ assert_prop(S5, prefetching, false),
+
+ VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5),
+ %% half the seconds taken to fetch one segment
+ Duration = timer:now_diff(End, Start) / 2000000,
+ VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Duration, VQ6),
+ S7 = rabbit_variable_queue:status(VQ7),
+ assert_prop(S7, q4, 0),
+ Q3 = proplists:get_value(q3, S7),
+ true = Q3 > 0, %% not prefetching everything
+ assert_prop(S7, gamma, {gamma, undefined, 0}),
+ assert_prop(S7, len, SegmentSize),
+ assert_prop(S7, prefetching, true),
+
+ %% now publish a segment, this'll go half in q1, half in q3, in
+ %% theory.
+ {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7),
+ S8 = rabbit_variable_queue:status(VQ8),
+ assert_prop(S8, q4, 0),
+ assert_prop(S8, q2, 0),
+ assert_prop(S8, len, Len1),
+ assert_prop(S8, prefetching, true),
+ Q3a = proplists:get_value(q3, S8),
+ Q3a_new = Q3a - Q3,
+ Q1a = proplists:get_value(q1, S8),
+ true = (Q3a_new + Q1a == SegmentSize) andalso Q1a < SegmentSize,
+
+ %% wait a bit, to let the prefetcher do its thing
+ timer:sleep(2000),
+ %% fetch a msg. The prefetcher *should* have finished, but can't
+ %% guarantee it.
+ Len2 = Len1-1,
+ {{_Msg, false, AckTag, Len2}, VQ9} = rabbit_variable_queue:fetch(VQ8),
+ S9 = rabbit_variable_queue:status(VQ9),
+ case proplists:get_value(prefetching, S9) of
+ true ->
+ %% bits of q1 could have moved into q3, and the prefetcher
+ %% won't have returned any betas for q3. So q3 can not
+ %% have shrunk.
+ Q3b = proplists:get_value(q3, S9),
+ Q1b = proplists:get_value(q1, S9),
+ true = (Q1a + Q3a) == (Q1b + Q3b) andalso Q3b >= Q3a;
+ false ->
+ %% there should be content in q4 and q3 (we only did 1
+ %% fetch. This is not sufficient to kill the prefetcher
+ %% through draining it when it's empty, thus if it's not
+ %% running, it must have finished, not been killed, thus
+ %% q4 will not be empty), and q1 should have gone into q3.
+ Q1b = proplists:get_value(q1, S9),
+ Q3b = proplists:get_value(q3, S9),
+ Q4b = proplists:get_value(q4, S9),
+ NotPrefetched = Q3b - (SegmentSize - Q1b),
+ SegmentSize = NotPrefetched + Q4b + 1 %% we fetched one
+ end,
+
+ %% just for the fun of it, set duration to 0. This should push
+ %% everything back into gamma, except the eldest (partial) segment
+ %% in q3
+ VQ10 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ9),
+ S10 = rabbit_variable_queue:status(VQ10),
+ assert_prop(S10, len, Len2),
+ assert_prop(S10, prefetching, false),
+ assert_prop(S10, q1, 0),
+ assert_prop(S10, q2, 0),
+ assert_prop(S10, gamma, {gamma, Len1, SegmentSize}),
+ assert_prop(S10, q3, (Len2 - SegmentSize)),
+ assert_prop(S10, q4, 0),
+
+ {VQ11, AckTags1} = variable_queue_fetch(Len2, true, false, Len2, VQ10),
+ VQ12 = rabbit_variable_queue:ack([AckTag|AckTags1], VQ11),
+ {empty, VQ13} = rabbit_variable_queue:fetch(VQ12),
+ rabbit_variable_queue:terminate(VQ13),
+
+ passed.
+
+test_variable_queue_prefetching_during_publish(PrefetchDelay) ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ VQ0 = fresh_variable_queue(),
+ VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ0),
+ assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
+
+ Len1 = 2*SegmentSize,
+ {_SeqIds, VQ2} = variable_queue_publish(true, Len1, VQ1),
+ %% one segment will be in q3, the other in gamma. We want to fetch
+ %% all of q3 so that gamma is then moved into q3, emptying gamma
+
+ VQ3 = rabbit_variable_queue:remeasure_egress_rate(VQ2),
+ {VQ4, AckTags} = variable_queue_fetch(SegmentSize, true, false, Len1, VQ3),
+ VQ5 = rabbit_variable_queue:ack(AckTags, VQ4),
+ S5 = rabbit_variable_queue:status(VQ5),
+ assert_prop(S5, q4, 0),
+ assert_prop(S5, q3, SegmentSize),
+ assert_prop(S5, gamma, {gamma, undefined, 0}),
+ assert_prop(S5, len, SegmentSize),
+ assert_prop(S5, prefetching, false),
+
+ %% we assume that we can fetch at > 1 msg a second
+ VQ6 = rabbit_variable_queue:remeasure_egress_rate(VQ5),
+ VQ7 = rabbit_variable_queue:set_queue_ram_duration_target(Len1, VQ6),
+ S7 = rabbit_variable_queue:status(VQ7),
+ assert_prop(S7, q4, 0),
+ assert_prop(S7, q3, 0),
+ assert_prop(S7, gamma, {gamma, undefined, 0}),
+ assert_prop(S7, len, SegmentSize),
+ assert_prop(S7, prefetching, true),
+
+ timer:sleep(PrefetchDelay),
+
+ {_SeqIds1, VQ8} = variable_queue_publish(true, SegmentSize, VQ7),
+ S8 = rabbit_variable_queue:status(VQ8),
+ assert_prop(S8, q4, 0),
+ assert_prop(S8, q2, 0),
+ assert_prop(S8, q1, SegmentSize),
+ assert_prop(S8, len, Len1),
+ assert_prop(S8, prefetching, true),
+
+ {VQ9, AckTags1} =
+ variable_queue_fetch(SegmentSize-1, true, false, Len1, VQ8),
+ VQ10 = rabbit_variable_queue:ack(AckTags1, VQ9),
+ %% can't guarantee the prefetcher has stopped here. If it is still
+ %% running, then we must have SegmentSize is q1. If it's not
+ %% running, and it completed, then we'll find SegmentSize + 1 in
+ %% q4 (q1 will have been joined to q4), otherwise, we'll find
+ %% SegmentSize in q1 and 1 in q3 and q4 empty.
+ S10 = rabbit_variable_queue:status(VQ10),
+ assert_prop(S10, q2, 0),
+ assert_prop(S10, len, (SegmentSize+1)),
+ case proplists:get_value(prefetching, S10) of
+ true -> assert_prop(S10, q1, SegmentSize),
+ assert_prop(S10, q3, 0),
+ assert_prop(S10, q4, 0);
+ false -> case proplists:get_value(q3, S10) of
+ 0 -> assert_prop(S10, q4, SegmentSize+1),
+ assert_prop(S10, q1, 0);
+ 1 -> assert_prop(S10, q4, 0),
+ assert_prop(S10, q1, SegmentSize)
+ end
+ end,
+
+ {VQ11, AckTags2} =
+ variable_queue_fetch(SegmentSize+1, true, false, SegmentSize+1, VQ10),
+ VQ12 = rabbit_variable_queue:ack(AckTags2, VQ11),
+
+ {empty, VQ13} = rabbit_variable_queue:fetch(VQ12),
+ rabbit_variable_queue:terminate(VQ13),
+
+ passed.
+
+test_variable_queue_prefetching_and_gammas_to_betas() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ VQ0 = fresh_variable_queue(),
VQ1 = rabbit_variable_queue:set_queue_ram_duration_target(10, VQ0),
assert_prop(rabbit_variable_queue:status(VQ1), target_ram_msg_count, 0),
- {SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1),
+ {_SeqIds, VQ2} = variable_queue_publish(false, 3 * SegmentSize, VQ1),
S2 = rabbit_variable_queue:status(VQ2),
assert_prop(S2, gamma, {gamma, SegmentSize, 2*SegmentSize}),
assert_prop(S2, q3, SegmentSize),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 2624a9fb80..d7b9dafbff 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -61,28 +61,8 @@
on_sync
}).
--record(alpha,
- { msg,
- seq_id,
- is_delivered,
- msg_on_disk,
- index_on_disk
- }).
-
--record(beta,
- { msg_id,
- seq_id,
- is_persistent,
- is_delivered,
- index_on_disk
- }).
-
--record(gamma,
- { seq_id,
- count
- }).
-
-include("rabbit.hrl").
+-include("rabbit_queue.hrl").
%%----------------------------------------------------------------------------
@@ -682,7 +662,9 @@ read_index_segment(SeqId, IndexState) ->
drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) ->
State;
drain_prefetcher(DrainOrStop,
- State = #vqstate { prefetcher = Prefetcher, q3 = Q3, q4 = Q4,
+ State = #vqstate { prefetcher = Prefetcher, q1 = Q1, q2 = Q2,
+ gamma = #gamma { count = GammaCount },
+ q3 = Q3, q4 = Q4,
ram_msg_count = RamMsgCount }) ->
Fun = case DrainOrStop of
drain -> fun rabbit_queue_prefetcher:drain/1;
@@ -693,16 +675,27 @@ drain_prefetcher(DrainOrStop,
{empty, Betas} -> %% drain or drain_and_stop
{queue:join(Betas, Q3), Q4, undefined, 0};
{finished, Alphas} -> %% just drain
- {Q3, Alphas, undefined, queue:len(Alphas)};
+ {Q3, queue:join(Q4, Alphas), undefined, queue:len(Alphas)};
{continuing, Alphas} -> %% just drain
- {Q3, Alphas, Prefetcher, queue:len(Alphas)};
+ {Q3, queue:join(Q4, Alphas), Prefetcher, queue:len(Alphas)};
{Alphas, Betas} -> %% just drain_and_stop
{queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined,
queue:len(Alphas)}
end,
- maybe_push_q1_to_betas(
- State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a,
- ram_msg_count = RamMsgCount + RamMsgCountAdj }).
+ State1 = State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a,
+ ram_msg_count = RamMsgCount + RamMsgCountAdj },
+ %% don't join up with q1/q2 unless the prefetcher has stopped
+ State2 = case GammaCount == 0 andalso Prefetcher1 == undefined of
+ true -> case queue:is_empty(Q3a) andalso queue:is_empty(Q2) of
+ true ->
+ State1 #vqstate { q1 = queue:new(),
+ q4 = queue:join(Q4a, Q1) };
+ false ->
+ State1 #vqstate { q3 = queue:join(Q3a, Q2) }
+ end;
+ false -> State1
+ end,
+ maybe_push_q1_to_betas(State2).
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
@@ -796,9 +789,9 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
store_alpha_entry(Entry = #alpha {}, State =
#vqstate { q1 = Q1, q2 = Q2,
gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4 }) ->
- case queue:is_empty(Q2) andalso GammaCount == 0 andalso queue:is_empty(Q3)
- of
+ q3 = Q3, q4 = Q4, prefetcher = Prefetcher }) ->
+ case queue:is_empty(Q2) andalso GammaCount == 0 andalso
+ queue:is_empty(Q3) andalso Prefetcher == undefined of
true ->
State #vqstate { q4 = queue:in(Entry, Q4) };
false ->