summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-16 17:40:15 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-16 17:40:15 +0000
commit148dc35396c672c12513b5a8269684ce6df06408 (patch)
tree59fbdfa5db60ac5624a5876d10d65363336c2c0d
parent5a7d9cee4faa420bd1cbdb7677a9d1fc8c528ddc (diff)
downloadrabbitmq-server-git-148dc35396c672c12513b5a8269684ce6df06408.tar.gz
stripped out the prefetcher. Note that I've not fixed up the tests yet, so they won't pass
-rw-r--r--src/rabbit_amqqueue_process.erl5
-rw-r--r--src/rabbit_msg_store.erl12
-rw-r--r--src/rabbit_queue_prefetcher.erl295
-rw-r--r--src/rabbit_tests.erl45
-rw-r--r--src/rabbit_variable_queue.erl121
5 files changed, 35 insertions, 443 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b0c1ccacb8..3adf97ff85 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -922,7 +922,6 @@ handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) ->
- VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS),
- VQS2 = rabbit_variable_queue:full_flush_journal(VQS1),
+ VQS1 = rabbit_variable_queue:full_flush_journal(VQS),
{hibernate, stop_egress_rate_timer(
- State#q{ variable_queue_state = VQS2 })}.
+ State#q{ variable_queue_state = VQS1 })}.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 591435ba01..b42574c0f7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -33,8 +33,8 @@
-behaviour(gen_server2).
--export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1,
- release/1, sync/2]).
+-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1,
+ sync/2]).
-export([sync/0]). %% internal
@@ -62,8 +62,6 @@
{'ok', pid()} | 'ignore' | {'error', any()}).
-spec(write/2 :: (msg_id(), msg()) -> 'ok').
-spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found').
--spec(peruse/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) ->
- 'ok').
-spec(contains/1 :: (msg_id()) -> boolean()).
-spec(remove/1 :: ([msg_id()]) -> 'ok').
-spec(release/1 :: ([msg_id()]) -> 'ok').
@@ -233,7 +231,6 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}).
read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity).
-peruse(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {peruse, MsgId, Fun}).
contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity).
remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}).
release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}).
@@ -334,11 +331,6 @@ handle_cast({write, MsgId, Msg},
noreply(State)
end;
-handle_cast({peruse, MsgId, Fun}, State) ->
- {Result, State1} = internal_read_message(MsgId, State),
- Fun(Result),
- noreply(State1);
-
handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) ->
noreply(
compact(sets:to_list(
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
deleted file mode 100644
index f5e717f55b..0000000000
--- a/src/rabbit_queue_prefetcher.erl
+++ /dev/null
@@ -1,295 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_queue_prefetcher).
-
--behaviour(gen_server2).
-
--export([start_link/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([publish/2, drain/1, drain_and_stop/1, stop/1]).
-
--include("rabbit.hrl").
--include("rabbit_queue.hrl").
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
--record(pstate,
- { alphas,
- betas,
- queue_mref,
- peruse_cb
- }).
-
-%%----------------------------------------------------------------------------
-%% Novel
-%%----------------------------------------------------------------------------
-
-%% The design of the prefetcher is based on the following:
-%%
-%% a) It must issue low-priority (-ve) requests to the disk queue for
-%% the next message.
-%% b) If the prefetcher is empty and the amqqueue_process
-%% (mixed_queue) asks it for a message, it must exit immediately,
-%% telling the mixed_queue that it is empty so that the mixed_queue
-%% can then take the more efficient path and communicate with the
-%% disk_queue directly
-%% c) No message can accidentally be delivered twice, or lost
-%% d) The prefetcher must only cause load when the disk_queue is
-%% otherwise idle, and must not worsen performance in a loaded
-%% situation.
-%%
-%% As such, it's a little tricky. It must never issue a call to the
-%% disk_queue - if it did, then that could potentially block, thus
-%% causing pain to the mixed_queue that needs fast answers as to
-%% whether the prefetcher has prefetched content or not. It behaves as
-%% follows:
-%%
-%% 1) disk_queue:prefetch(Q)
-%% This is a low priority cast
-%%
-%% 2) The disk_queue may pick up the cast, at which point it'll read
-%% the next message and invoke prefetcher:publish(Msg) - normal
-%% priority cast. Note that in the mean time, the mixed_queue could
-%% have come along, found the prefetcher empty, asked it to
-%% exit. This means the effective "reply" from the disk_queue will
-%% go no where. As a result, the disk_queue should not advance the
-%% queue. However, it does mark the messages as delivered. The
-%% reasoning is that if it didn't, there would be the possibility
-%% that the message was delivered without it being marked as such
-%% on disk. We must maintain the property that a message which is
-%% marked as non-redelivered really hasn't been delivered anywhere
-%% before. The downside is that should the prefetcher not receive
-%% this message, the queue will then fetch the message from the
-%% disk_queue directly, and this message will have its delivered
-%% bit set. The queue will not be advanced though - if it did
-%% advance the queue and the msg was then lost, then the queue
-%% would have lost a msg that the mixed_queue would not pick up.
-%%
-%% 3) The prefetcher hopefully receives the call from
-%% prefetcher:publish(Msg). It replies immediately, and then adds
-%% to its internal queue. A cast is not sufficient as a pseudo
-%% "reply" here because the mixed_queue could come along, drain the
-%% prefetcher, thus catching the msg just sent by the disk_queue
-%% and then call disk_queue:fetch(Q) which is normal priority call,
-%% which could overtake a reply cast from the prefetcher to the
-%% disk queue, resulting in the same message being delivered
-%% twice. Thus when the disk_queue calls prefetcher:publish(Msg),
-%% it is briefly blocked. However, a) the prefetcher replies
-%% immediately, and b) the prefetcher should never have more than
-%% two items in its mailbox anyway (one from the queue process /
-%% mixed_queue and one from the disk_queue), so this should not
-%% cause a problem to the disk_queue.
-%%
-%% 4) The disk_queue receives the reply, and advances the Q to the
-%% next msg.
-%%
-%% 5) If the prefetcher has not met its target then it goes back to
-%% 1). Otherwise it just sits and waits for the mixed_queue to
-%% drain it.
-%%
-%% Now at some point, the mixed_queue will come along and will call
-%% prefetcher:drain() - normal priority call. The prefetcher then
-%% replies with its internal queue and a flag saying if the prefetcher
-%% has finished or is continuing; if the prefetch target was reached,
-%% the prefetcher stops normally at this point. If it hasn't been
-%% reached, then the prefetcher continues to hang around (it almost
-%% certainly has issued a disk_queue:prefetch(Q) cast and is waiting
-%% for a reply from the disk_queue).
-%%
-%% If the mixed_queue calls prefetcher:drain() and the prefetcher's
-%% internal queue is empty then the prefetcher replies with 'empty',
-%% and it exits. This informs the mixed_queue that it should from now
-%% on talk directly with the disk_queue and not via the
-%% prefetcher. This is more efficient and the mixed_queue will use
-%% normal priority blocking calls to the disk_queue and thus get
-%% better service.
-%%
-%% The prefetcher may at this point have issued a
-%% disk_queue:prefetch(Q) cast which has not yet been picked up by the
-%% disk_queue. This msg won't go away and the disk_queue will
-%% eventually find it. However, when it does, it'll simply read the
-%% next message from the queue (which could now be empty), possibly
-%% populate the cache (no harm done), mark the message as delivered
-%% (oh well, not a spec violation, and better than the alternative)
-%% and try and call prefetcher:publish(Msg) which will result in an
-%% error, which the disk_queue catches, as the publish call is to a
-%% non-existant process. However, the state of the queue has not been
-%% altered so the mixed_queue will be able to fetch this message as if
-%% it had never been prefetched.
-%%
-%% The only point at which the queue is advanced is when the
-%% prefetcher replies to the publish call. At this point the message
-%% has been received by the prefetcher and so we guarantee it will be
-%% passed to the mixed_queue when the mixed_queue tries to drain the
-%% prefetcher. We must therefore ensure that this msg can't also be
-%% delivered to the mixed_queue directly by the disk_queue through the
-%% mixed_queue calling disk_queue:fetch(Q) which is why the
-%% prefetcher:publish function is a call and not a cast, thus blocking
-%% the disk_queue.
-%%
-%% Finally, the prefetcher is only created when the mixed_queue is
-%% operating in mixed mode and it sees that the next N messages are
-%% all on disk, and the queue process is about to hibernate. During
-%% this phase, the mixed_queue can be asked to go back to disk_only
-%% mode. When this happens, it calls prefetcher:drain_and_stop() which
-%% behaves like two consecutive calls to drain() - i.e. replies with
-%% all prefetched messages and causes the prefetcher to exit.
-%%
-%% Note there is a flaw here in that we end up marking messages which
-%% have come through the prefetcher as delivered even if they don't
-%% get delivered (e.g. prefetcher fetches them, then broker
-%% dies). However, the alternative is that the mixed_queue must do a
-%% call to the disk_queue when it effectively passes them out to the
-%% rabbit_writer. This would hurt performance, and even at that stage,
-%% we have no guarantee that the message will really go out of the
-%% socket. What we do still have is that messages which have the
-%% redelivered bit set false really are guaranteed to have not been
-%% delivered already.
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start_link/1 :: (queue()) ->
- ({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(publish/2 :: (pid(), (message()| 'not_found')) -> 'ok').
--spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})).
--spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})).
--spec(stop/1 :: (pid()) -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
-start_link(Betas) ->
- false = queue:is_empty(Betas), %% ASSERTION
- gen_server2:start_link(?MODULE, [Betas, self()], []).
-
-publish(Prefetcher, Obj = #basic_message {}) ->
- gen_server2:call(Prefetcher, {publish, Obj}, infinity);
-publish(Prefetcher, not_found) ->
- gen_server2:call(Prefetcher, publish_empty, infinity).
-
-drain(Prefetcher) ->
- gen_server2:call(Prefetcher, drain, infinity).
-
-drain_and_stop(Prefetcher) ->
- gen_server2:call(Prefetcher, drain_and_stop, infinity).
-
-stop(Prefetcher) ->
- gen_server2:call(Prefetcher, stop, infinity).
-
-%%----------------------------------------------------------------------------
-
-init([Betas, QPid]) when is_pid(QPid) ->
- %% link isn't enough because the signal will not appear if the
- %% queue exits normally. Thus have to use monitor.
- MRef = erlang:monitor(process, QPid),
- Self = self(),
- CB = fun (Result) ->
- rabbit_misc:with_exit_handler(
- fun () -> ok end,
- fun () -> case Result of
- {ok, Msg} -> publish(Self, Msg);
- not_found -> publish(Self, not_found)
- end
- end)
- end,
- State = #pstate { alphas = queue:new(),
- betas = Betas,
- queue_mref = MRef,
- peruse_cb = CB
- },
- {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN,
- ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({publish, Msg = #basic_message { guid = MsgId,
- is_persistent = IsPersistent }},
- DiskQueue, State = #pstate { alphas = Alphas, betas = Betas }) ->
- gen_server2:reply(DiskQueue, ok),
- {{value, #beta { msg_id = MsgId, seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- index_on_disk = IndexOnDisk}}, Betas1} = queue:out(Betas),
- Alphas1 = queue:in(#alpha { msg = Msg, seq_id = SeqId,
- is_delivered = IsDelivered, msg_on_disk = true,
- index_on_disk = IndexOnDisk }, Alphas),
- State1 = State #pstate { alphas = Alphas1, betas = Betas1 },
- {Timeout, State2} = case queue:is_empty(Betas1) of
- true -> {hibernate, State1};
- false -> {infinity, prefetch(State1)}
- end,
- {noreply, State2, Timeout};
-handle_call(publish_empty, _From, State) ->
- %% Very odd. This could happen if the queue is deleted or purged
- %% and the mixed queue fails to shut us down.
- {reply, ok, State, hibernate};
-handle_call(drain, _From, State = #pstate { alphas = Alphas, betas = Betas }) ->
- case {queue:is_empty(Betas), queue:is_empty(Alphas)} of
- {true , _ } -> {stop, normal, {finished, Alphas}, State};
- {false, true } -> {stop, normal, {empty, Betas}, State};
- {false, false} -> {reply, {continuing, Alphas},
- State #pstate { alphas = queue:new() }}
- end;
-handle_call(drain_and_stop, _From, State = #pstate { alphas = Alphas,
- betas = Betas }) ->
- Res = case queue:is_empty(Alphas) of
- true -> {empty, Betas};
- false -> {Alphas, Betas}
- end,
- {stop, normal, Res, State};
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State}.
-
-handle_cast(Msg, State) ->
- exit({unexpected_message_cast_to_prefetcher, Msg, State}).
-
-handle_info({'DOWN', MRef, process, _Pid, _Reason},
- State = #pstate { queue_mref = MRef }) ->
- %% this is the amqqueue_process going down, so we should go down
- %% too
- {stop, normal, State}.
-
-terminate(_Reason, _State) ->
- ok.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-prefetch(State = #pstate { betas = Betas, peruse_cb = CB }) ->
- {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas),
- ok = rabbit_msg_store:peruse(MsgId, CB),
- State.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5b453b627a..bfeb397cc1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -931,22 +931,6 @@ test_msg_store() ->
ok = rabbit_msg_store:release(MsgIds2ndHalf),
%% read the second half again, just for fun (aka code coverage)
ok = msg_store_read(MsgIds2ndHalf),
- %% read the second half via peruse
- lists:foldl(
- fun (MsgId, ok) ->
- rabbit_msg_store:peruse(MsgId,
- fun ({ok, MsgId1}) when MsgId1 == MsgId ->
- Self ! {peruse, MsgId1}
- end),
- receive
- {peruse, MsgId} ->
- ok
- after
- 10000 ->
- io:format("Failed to receive response via peruse~n"),
- throw(timeout)
- end
- end, ok, MsgIds2ndHalf),
%% stop and restart, preserving every other msg in 2nd half
ok = stop_msg_store(),
ok = start_msg_store(fun ([]) -> finished;
@@ -1477,30 +1461,29 @@ test_variable_queue_prefetching_and_gammas_to_betas() ->
assert_prop(S11, q2, 0),
assert_prop(S11, q1, 0),
- VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11),
- S12 = rabbit_variable_queue:status(VQ12),
+ S12 = rabbit_variable_queue:status(VQ11),
assert_prop(S12, prefetching, (Len4 - Prefetched) > 0),
timer:sleep(2000),
%% we have to fetch all of q4 before the prefetcher will be drained
- {VQ13, AckTags1} =
- variable_queue_fetch(Prefetched, false, false, Len4, VQ12),
- {VQ16, Acks} =
+ {VQ12, AckTags1} =
+ variable_queue_fetch(Prefetched, false, false, Len4, VQ11),
+ {VQ15, Acks} =
case Len4 == Prefetched of
true ->
- {VQ13, [AckTag2, AckTag1, AckTag, AckTags1]};
+ {VQ12, [AckTag2, AckTag1, AckTag, AckTags1]};
false ->
Len5 = Len4 - Prefetched - 1,
- {{_Msg3, false, AckTag3, Len5}, VQ14} =
- rabbit_variable_queue:fetch(VQ13),
- assert_prop(rabbit_variable_queue:status(VQ14),
+ {{_Msg3, false, AckTag3, Len5}, VQ13} =
+ rabbit_variable_queue:fetch(VQ12),
+ assert_prop(rabbit_variable_queue:status(VQ13),
prefetching, false),
- {VQ15, AckTags2} =
- variable_queue_fetch(Len5, false, false, Len5, VQ14),
- {VQ15, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]}
+ {VQ14, AckTags2} =
+ variable_queue_fetch(Len5, false, false, Len5, VQ13),
+ {VQ14, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]}
end,
- VQ17 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ16),
+ VQ16 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ15),
- {empty, VQ18} = rabbit_variable_queue:fetch(VQ17),
+ {empty, VQ17} = rabbit_variable_queue:fetch(VQ16),
- rabbit_variable_queue:terminate(VQ18),
+ rabbit_variable_queue:terminate(VQ17),
passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index de9c08a343..c89cdfd587 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -33,10 +33,10 @@
-export([init/1, terminate/1, publish/2, publish_delivered/2,
set_queue_ram_duration_target/2, remeasure_egress_rate/1,
- ram_duration/1, fetch/1, ack/2, len/1, is_empty/1,
- maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2,
- tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4,
- tx_commit_from_vq/1, needs_sync/1, full_flush_journal/1, status/1]).
+ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1,
+ requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
+ tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
+ full_flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -56,7 +56,6 @@
egress_rate,
avg_egress_rate,
egress_rate_timestamp,
- prefetcher,
len,
on_sync
}).
@@ -116,7 +115,6 @@
egress_rate :: float(),
avg_egress_rate :: float(),
egress_rate_timestamp :: {integer(), integer(), integer()},
- prefetcher :: ('undefined' | pid()),
len :: non_neg_integer(),
on_sync :: {[ack()], [msg_id()], [{pid(), any()}]}
}).
@@ -137,7 +135,6 @@
-spec(ack/2 :: ([ack()], vqstate()) -> vqstate()).
-spec(len/1 :: (vqstate()) -> non_neg_integer()).
-spec(is_empty/1 :: (vqstate()) -> boolean()).
--spec(maybe_start_prefetcher/1 :: (vqstate()) -> vqstate()).
-spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}).
-spec(delete/1 :: (vqstate()) -> vqstate()).
-spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()).
@@ -181,7 +178,6 @@ init(QueueName) ->
egress_rate = 0,
avg_egress_rate = 0,
egress_rate_timestamp = now(),
- prefetcher = undefined,
len = GammaCount,
on_sync = {[], [], []}
},
@@ -221,13 +217,10 @@ set_queue_ram_duration_target(
end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
- if TargetRamMsgCount == TargetRamMsgCount1 ->
- State1;
- TargetRamMsgCount1 == undefined orelse
- TargetRamMsgCount < TargetRamMsgCount1 ->
- maybe_start_prefetcher(State1);
- true ->
- reduce_memory_use(State1)
+ case TargetRamMsgCount1 == undefined orelse
+ TargetRamMsgCount1 >= TargetRamMsgCount of
+ true -> State1;
+ false -> reduce_memory_use(State1)
end.
remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
@@ -258,14 +251,11 @@ ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
end.
fetch(State =
- #vqstate { q4 = Q4, ram_msg_count = RamMsgCount,
- out_counter = OutCount, prefetcher = Prefetcher,
+ #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
index_state = IndexState, len = Len }) ->
case queue:out(Q4) of
- {empty, _Q4} when Prefetcher == undefined ->
- fetch_from_q3_or_gamma(State);
{empty, _Q4} ->
- fetch(drain_prefetcher(drain, State));
+ fetch_from_q3_or_gamma(State);
{{value,
#alpha { msg = Msg = #basic_message { guid = MsgId,
is_persistent = IsPersistent },
@@ -333,45 +323,12 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-maybe_start_prefetcher(State = #vqstate { target_ram_msg_count = 0 }) ->
- State;
-maybe_start_prefetcher(State = #vqstate { prefetcher = undefined }) ->
- %% ensure we have as much index in RAM as we can
- State1 = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount,
- q1 = Q1, q3 = Q3 } = maybe_gammas_to_betas(State),
- case queue:is_empty(Q3) of
- true -> %% nothing to do
- State1;
- false ->
- %% prefetched content takes priority over q1
- AvailableSpace =
- case TargetRamMsgCount of
- undefined -> queue:len(Q3);
- _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1)
- end,
- PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]),
- case PrefetchCount =< 0 of
- true -> State1;
- false ->
- {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3),
- {ok, Prefetcher} =
- rabbit_queue_prefetcher:start_link(PrefetchQueue),
- State1 #vqstate { q3 = Q3a, prefetcher = Prefetcher }
- end
- end;
-maybe_start_prefetcher(State) ->
- State.
-
-purge(State = #vqstate { prefetcher = undefined, q4 = Q4,
- index_state = IndexState, len = Len }) ->
+purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState),
{Len, State1} =
purge1(Q4Count, State #vqstate { index_state = IndexState1,
q4 = queue:new() }),
- {Len, State1 #vqstate { len = 0 }};
-purge(State) ->
- purge(drain_prefetcher(stop, State)).
+ {Len, State1 #vqstate { len = 0 }}.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
@@ -490,7 +447,7 @@ full_flush_journal(State = #vqstate { index_state = IndexState }) ->
status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
len = Len, on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount, prefetcher = Prefetcher,
+ ram_msg_count = RamMsgCount,
avg_egress_rate = AvgEgressRate }) ->
[ {q1, queue:len(Q1)},
{q2, queue:len(Q2)},
@@ -501,8 +458,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{outstanding_txns, length(From)},
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
- {avg_egress_rate, AvgEgressRate},
- {prefetching, Prefetcher /= undefined} ].
+ {avg_egress_rate, AvgEgressRate} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -660,56 +616,13 @@ fetch_from_q3_or_gamma(State = #vqstate {
fetch(State2)
end.
-drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) ->
- State;
-drain_prefetcher(DrainOrStop,
- State = #vqstate { prefetcher = Prefetcher, q1 = Q1, q2 = Q2,
- gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4,
- ram_msg_count = RamMsgCount }) ->
- Fun = case DrainOrStop of
- drain -> fun rabbit_queue_prefetcher:drain/1;
- stop -> fun rabbit_queue_prefetcher:drain_and_stop/1
- end,
- {Q3a, Q4a, Prefetcher1, RamMsgCountAdj} =
- case Fun(Prefetcher) of
- {empty, Betas} -> %% drain or drain_and_stop
- {queue:join(Betas, Q3), Q4, undefined, 0};
- {finished, Alphas} -> %% just drain
- {Q3, queue:join(Q4, Alphas), undefined, queue:len(Alphas)};
- {continuing, Alphas} -> %% just drain
- {Q3, queue:join(Q4, Alphas), Prefetcher, queue:len(Alphas)};
- {Alphas, Betas} -> %% just drain_and_stop
- {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined,
- queue:len(Alphas)}
- end,
- State1 = State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a,
- ram_msg_count = RamMsgCount + RamMsgCountAdj },
- %% don't join up with q1/q2 unless the prefetcher has stopped
- State2 = case GammaCount == 0 andalso Prefetcher1 == undefined of
- true -> case queue:is_empty(Q3a) andalso queue:is_empty(Q2) of
- true ->
- State1 #vqstate { q1 = queue:new(),
- q4 = queue:join(Q4a, Q1) };
- false ->
- State1 #vqstate { q3 = queue:join(Q3a, Q2) }
- end;
- false -> State1
- end,
- maybe_push_q1_to_betas(State2).
-
reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
reduce_memory_use(State =
#vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
- %% strictly, it's not necessary to stop the prefetcher this early,
- %% but because of its potential effect on q1 and the
- %% ram_msg_count, it's just much simpler to stop it sooner and
- %% relaunch when we next hibernate.
- State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(
- drain_prefetcher(stop, State))),
+ State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
case TargetRamMsgCount of
0 -> push_betas_to_gammas(State1);
_ -> State1
@@ -810,9 +723,9 @@ publish(neither, Msg = #basic_message { guid = MsgId,
store_alpha_entry(Entry = #alpha {}, State =
#vqstate { q1 = Q1, q2 = Q2,
gamma = #gamma { count = GammaCount },
- q3 = Q3, q4 = Q4, prefetcher = Prefetcher }) ->
+ q3 = Q3, q4 = Q4 }) ->
case queue:is_empty(Q2) andalso GammaCount == 0 andalso
- queue:is_empty(Q3) andalso Prefetcher == undefined of
+ queue:is_empty(Q3) of
true ->
State #vqstate { q4 = queue:in(Entry, Q4) };
false ->