diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-16 17:40:15 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-16 17:40:15 +0000 |
| commit | 148dc35396c672c12513b5a8269684ce6df06408 (patch) | |
| tree | 59fbdfa5db60ac5624a5876d10d65363336c2c0d | |
| parent | 5a7d9cee4faa420bd1cbdb7677a9d1fc8c528ddc (diff) | |
| download | rabbitmq-server-git-148dc35396c672c12513b5a8269684ce6df06408.tar.gz | |
stripped out the prefetcher. Note that I've not fixed up the tests yet, so they won't pass
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 295 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 45 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 121 |
5 files changed, 35 insertions, 443 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b0c1ccacb8..3adf97ff85 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -922,7 +922,6 @@ handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{ variable_queue_state = VQS }) -> - VQS1 = rabbit_variable_queue:maybe_start_prefetcher(VQS), - VQS2 = rabbit_variable_queue:full_flush_journal(VQS1), + VQS1 = rabbit_variable_queue:full_flush_journal(VQS), {hibernate, stop_egress_rate_timer( - State#q{ variable_queue_state = VQS2 })}. + State#q{ variable_queue_state = VQS1 })}. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 591435ba01..b42574c0f7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -33,8 +33,8 @@ -behaviour(gen_server2). --export([start_link/3, write/2, read/1, peruse/2, contains/1, remove/1, - release/1, sync/2]). +-export([start_link/3, write/2, read/1, contains/1, remove/1, release/1, + sync/2]). -export([sync/0]). %% internal @@ -62,8 +62,6 @@ {'ok', pid()} | 'ignore' | {'error', any()}). -spec(write/2 :: (msg_id(), msg()) -> 'ok'). -spec(read/1 :: (msg_id()) -> {'ok', msg()} | 'not_found'). --spec(peruse/2 :: (msg_id(), fun (({'ok', msg()} | 'not_found') -> 'ok')) -> - 'ok'). -spec(contains/1 :: (msg_id()) -> boolean()). -spec(remove/1 :: ([msg_id()]) -> 'ok'). -spec(release/1 :: ([msg_id()]) -> 'ok'). @@ -233,7 +231,6 @@ start_link(Dir, MsgRefDeltaGen, MsgRefDeltaGenInit) -> write(MsgId, Msg) -> gen_server2:cast(?SERVER, {write, MsgId, Msg}). read(MsgId) -> gen_server2:call(?SERVER, {read, MsgId}, infinity). -peruse(MsgId, Fun) -> gen_server2:pcast(?SERVER, -1, {peruse, MsgId, Fun}). contains(MsgId) -> gen_server2:call(?SERVER, {contains, MsgId}, infinity). remove(MsgIds) -> gen_server2:cast(?SERVER, {remove, MsgIds}). release(MsgIds) -> gen_server2:cast(?SERVER, {release, MsgIds}). @@ -334,11 +331,6 @@ handle_cast({write, MsgId, Msg}, noreply(State) end; -handle_cast({peruse, MsgId, Fun}, State) -> - {Result, State1} = internal_read_message(MsgId, State), - Fun(Result), - noreply(State1); - handle_cast({remove, MsgIds}, State = #msstate { current_file = CurFile }) -> noreply( compact(sets:to_list( diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl deleted file mode 100644 index f5e717f55b..0000000000 --- a/src/rabbit_queue_prefetcher.erl +++ /dev/null @@ -1,295 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_queue_prefetcher). - --behaviour(gen_server2). - --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([publish/2, drain/1, drain_and_stop/1, stop/1]). - --include("rabbit.hrl"). --include("rabbit_queue.hrl"). - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - --record(pstate, - { alphas, - betas, - queue_mref, - peruse_cb - }). - -%%---------------------------------------------------------------------------- -%% Novel -%%---------------------------------------------------------------------------- - -%% The design of the prefetcher is based on the following: -%% -%% a) It must issue low-priority (-ve) requests to the disk queue for -%% the next message. -%% b) If the prefetcher is empty and the amqqueue_process -%% (mixed_queue) asks it for a message, it must exit immediately, -%% telling the mixed_queue that it is empty so that the mixed_queue -%% can then take the more efficient path and communicate with the -%% disk_queue directly -%% c) No message can accidentally be delivered twice, or lost -%% d) The prefetcher must only cause load when the disk_queue is -%% otherwise idle, and must not worsen performance in a loaded -%% situation. -%% -%% As such, it's a little tricky. It must never issue a call to the -%% disk_queue - if it did, then that could potentially block, thus -%% causing pain to the mixed_queue that needs fast answers as to -%% whether the prefetcher has prefetched content or not. It behaves as -%% follows: -%% -%% 1) disk_queue:prefetch(Q) -%% This is a low priority cast -%% -%% 2) The disk_queue may pick up the cast, at which point it'll read -%% the next message and invoke prefetcher:publish(Msg) - normal -%% priority cast. Note that in the mean time, the mixed_queue could -%% have come along, found the prefetcher empty, asked it to -%% exit. This means the effective "reply" from the disk_queue will -%% go no where. As a result, the disk_queue should not advance the -%% queue. However, it does mark the messages as delivered. The -%% reasoning is that if it didn't, there would be the possibility -%% that the message was delivered without it being marked as such -%% on disk. We must maintain the property that a message which is -%% marked as non-redelivered really hasn't been delivered anywhere -%% before. The downside is that should the prefetcher not receive -%% this message, the queue will then fetch the message from the -%% disk_queue directly, and this message will have its delivered -%% bit set. The queue will not be advanced though - if it did -%% advance the queue and the msg was then lost, then the queue -%% would have lost a msg that the mixed_queue would not pick up. -%% -%% 3) The prefetcher hopefully receives the call from -%% prefetcher:publish(Msg). It replies immediately, and then adds -%% to its internal queue. A cast is not sufficient as a pseudo -%% "reply" here because the mixed_queue could come along, drain the -%% prefetcher, thus catching the msg just sent by the disk_queue -%% and then call disk_queue:fetch(Q) which is normal priority call, -%% which could overtake a reply cast from the prefetcher to the -%% disk queue, resulting in the same message being delivered -%% twice. Thus when the disk_queue calls prefetcher:publish(Msg), -%% it is briefly blocked. However, a) the prefetcher replies -%% immediately, and b) the prefetcher should never have more than -%% two items in its mailbox anyway (one from the queue process / -%% mixed_queue and one from the disk_queue), so this should not -%% cause a problem to the disk_queue. -%% -%% 4) The disk_queue receives the reply, and advances the Q to the -%% next msg. -%% -%% 5) If the prefetcher has not met its target then it goes back to -%% 1). Otherwise it just sits and waits for the mixed_queue to -%% drain it. -%% -%% Now at some point, the mixed_queue will come along and will call -%% prefetcher:drain() - normal priority call. The prefetcher then -%% replies with its internal queue and a flag saying if the prefetcher -%% has finished or is continuing; if the prefetch target was reached, -%% the prefetcher stops normally at this point. If it hasn't been -%% reached, then the prefetcher continues to hang around (it almost -%% certainly has issued a disk_queue:prefetch(Q) cast and is waiting -%% for a reply from the disk_queue). -%% -%% If the mixed_queue calls prefetcher:drain() and the prefetcher's -%% internal queue is empty then the prefetcher replies with 'empty', -%% and it exits. This informs the mixed_queue that it should from now -%% on talk directly with the disk_queue and not via the -%% prefetcher. This is more efficient and the mixed_queue will use -%% normal priority blocking calls to the disk_queue and thus get -%% better service. -%% -%% The prefetcher may at this point have issued a -%% disk_queue:prefetch(Q) cast which has not yet been picked up by the -%% disk_queue. This msg won't go away and the disk_queue will -%% eventually find it. However, when it does, it'll simply read the -%% next message from the queue (which could now be empty), possibly -%% populate the cache (no harm done), mark the message as delivered -%% (oh well, not a spec violation, and better than the alternative) -%% and try and call prefetcher:publish(Msg) which will result in an -%% error, which the disk_queue catches, as the publish call is to a -%% non-existant process. However, the state of the queue has not been -%% altered so the mixed_queue will be able to fetch this message as if -%% it had never been prefetched. -%% -%% The only point at which the queue is advanced is when the -%% prefetcher replies to the publish call. At this point the message -%% has been received by the prefetcher and so we guarantee it will be -%% passed to the mixed_queue when the mixed_queue tries to drain the -%% prefetcher. We must therefore ensure that this msg can't also be -%% delivered to the mixed_queue directly by the disk_queue through the -%% mixed_queue calling disk_queue:fetch(Q) which is why the -%% prefetcher:publish function is a call and not a cast, thus blocking -%% the disk_queue. -%% -%% Finally, the prefetcher is only created when the mixed_queue is -%% operating in mixed mode and it sees that the next N messages are -%% all on disk, and the queue process is about to hibernate. During -%% this phase, the mixed_queue can be asked to go back to disk_only -%% mode. When this happens, it calls prefetcher:drain_and_stop() which -%% behaves like two consecutive calls to drain() - i.e. replies with -%% all prefetched messages and causes the prefetcher to exit. -%% -%% Note there is a flaw here in that we end up marking messages which -%% have come through the prefetcher as delivered even if they don't -%% get delivered (e.g. prefetcher fetches them, then broker -%% dies). However, the alternative is that the mixed_queue must do a -%% call to the disk_queue when it effectively passes them out to the -%% rabbit_writer. This would hurt performance, and even at that stage, -%% we have no guarantee that the message will really go out of the -%% socket. What we do still have is that messages which have the -%% redelivered bit set false really are guaranteed to have not been -%% delivered already. - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/1 :: (queue()) -> - ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(publish/2 :: (pid(), (message()| 'not_found')) -> 'ok'). --spec(drain/1 :: (pid()) -> ({('finished' | 'continuing' | 'empty'), queue()})). --spec(drain_and_stop/1 :: (pid()) -> ({('empty' | queue()), queue()})). --spec(stop/1 :: (pid()) -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(Betas) -> - false = queue:is_empty(Betas), %% ASSERTION - gen_server2:start_link(?MODULE, [Betas, self()], []). - -publish(Prefetcher, Obj = #basic_message {}) -> - gen_server2:call(Prefetcher, {publish, Obj}, infinity); -publish(Prefetcher, not_found) -> - gen_server2:call(Prefetcher, publish_empty, infinity). - -drain(Prefetcher) -> - gen_server2:call(Prefetcher, drain, infinity). - -drain_and_stop(Prefetcher) -> - gen_server2:call(Prefetcher, drain_and_stop, infinity). - -stop(Prefetcher) -> - gen_server2:call(Prefetcher, stop, infinity). - -%%---------------------------------------------------------------------------- - -init([Betas, QPid]) when is_pid(QPid) -> - %% link isn't enough because the signal will not appear if the - %% queue exits normally. Thus have to use monitor. - MRef = erlang:monitor(process, QPid), - Self = self(), - CB = fun (Result) -> - rabbit_misc:with_exit_handler( - fun () -> ok end, - fun () -> case Result of - {ok, Msg} -> publish(Self, Msg); - not_found -> publish(Self, not_found) - end - end) - end, - State = #pstate { alphas = queue:new(), - betas = Betas, - queue_mref = MRef, - peruse_cb = CB - }, - {ok, prefetch(State), infinity, {backoff, ?HIBERNATE_AFTER_MIN, - ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({publish, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }}, - DiskQueue, State = #pstate { alphas = Alphas, betas = Betas }) -> - gen_server2:reply(DiskQueue, ok), - {{value, #beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk}}, Betas1} = queue:out(Betas), - Alphas1 = queue:in(#alpha { msg = Msg, seq_id = SeqId, - is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = IndexOnDisk }, Alphas), - State1 = State #pstate { alphas = Alphas1, betas = Betas1 }, - {Timeout, State2} = case queue:is_empty(Betas1) of - true -> {hibernate, State1}; - false -> {infinity, prefetch(State1)} - end, - {noreply, State2, Timeout}; -handle_call(publish_empty, _From, State) -> - %% Very odd. This could happen if the queue is deleted or purged - %% and the mixed queue fails to shut us down. - {reply, ok, State, hibernate}; -handle_call(drain, _From, State = #pstate { alphas = Alphas, betas = Betas }) -> - case {queue:is_empty(Betas), queue:is_empty(Alphas)} of - {true , _ } -> {stop, normal, {finished, Alphas}, State}; - {false, true } -> {stop, normal, {empty, Betas}, State}; - {false, false} -> {reply, {continuing, Alphas}, - State #pstate { alphas = queue:new() }} - end; -handle_call(drain_and_stop, _From, State = #pstate { alphas = Alphas, - betas = Betas }) -> - Res = case queue:is_empty(Alphas) of - true -> {empty, Betas}; - false -> {Alphas, Betas} - end, - {stop, normal, Res, State}; -handle_call(stop, _From, State) -> - {stop, normal, ok, State}. - -handle_cast(Msg, State) -> - exit({unexpected_message_cast_to_prefetcher, Msg, State}). - -handle_info({'DOWN', MRef, process, _Pid, _Reason}, - State = #pstate { queue_mref = MRef }) -> - %% this is the amqqueue_process going down, so we should go down - %% too - {stop, normal, State}. - -terminate(_Reason, _State) -> - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -prefetch(State = #pstate { betas = Betas, peruse_cb = CB }) -> - {{value, #beta { msg_id = MsgId }}, _Betas1} = queue:out(Betas), - ok = rabbit_msg_store:peruse(MsgId, CB), - State. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 5b453b627a..bfeb397cc1 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -931,22 +931,6 @@ test_msg_store() -> ok = rabbit_msg_store:release(MsgIds2ndHalf), %% read the second half again, just for fun (aka code coverage) ok = msg_store_read(MsgIds2ndHalf), - %% read the second half via peruse - lists:foldl( - fun (MsgId, ok) -> - rabbit_msg_store:peruse(MsgId, - fun ({ok, MsgId1}) when MsgId1 == MsgId -> - Self ! {peruse, MsgId1} - end), - receive - {peruse, MsgId} -> - ok - after - 10000 -> - io:format("Failed to receive response via peruse~n"), - throw(timeout) - end - end, ok, MsgIds2ndHalf), %% stop and restart, preserving every other msg in 2nd half ok = stop_msg_store(), ok = start_msg_store(fun ([]) -> finished; @@ -1477,30 +1461,29 @@ test_variable_queue_prefetching_and_gammas_to_betas() -> assert_prop(S11, q2, 0), assert_prop(S11, q1, 0), - VQ12 = rabbit_variable_queue:maybe_start_prefetcher(VQ11), - S12 = rabbit_variable_queue:status(VQ12), + S12 = rabbit_variable_queue:status(VQ11), assert_prop(S12, prefetching, (Len4 - Prefetched) > 0), timer:sleep(2000), %% we have to fetch all of q4 before the prefetcher will be drained - {VQ13, AckTags1} = - variable_queue_fetch(Prefetched, false, false, Len4, VQ12), - {VQ16, Acks} = + {VQ12, AckTags1} = + variable_queue_fetch(Prefetched, false, false, Len4, VQ11), + {VQ15, Acks} = case Len4 == Prefetched of true -> - {VQ13, [AckTag2, AckTag1, AckTag, AckTags1]}; + {VQ12, [AckTag2, AckTag1, AckTag, AckTags1]}; false -> Len5 = Len4 - Prefetched - 1, - {{_Msg3, false, AckTag3, Len5}, VQ14} = - rabbit_variable_queue:fetch(VQ13), - assert_prop(rabbit_variable_queue:status(VQ14), + {{_Msg3, false, AckTag3, Len5}, VQ13} = + rabbit_variable_queue:fetch(VQ12), + assert_prop(rabbit_variable_queue:status(VQ13), prefetching, false), - {VQ15, AckTags2} = - variable_queue_fetch(Len5, false, false, Len5, VQ14), - {VQ15, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]} + {VQ14, AckTags2} = + variable_queue_fetch(Len5, false, false, Len5, VQ13), + {VQ14, [AckTag3, AckTag2, AckTag1, AckTag, AckTags1, AckTags2]} end, - VQ17 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ16), + VQ16 = rabbit_variable_queue:ack(lists:flatten(Acks), VQ15), - {empty, VQ18} = rabbit_variable_queue:fetch(VQ17), + {empty, VQ17} = rabbit_variable_queue:fetch(VQ16), - rabbit_variable_queue:terminate(VQ18), + rabbit_variable_queue:terminate(VQ17), passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index de9c08a343..c89cdfd587 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -33,10 +33,10 @@ -export([init/1, terminate/1, publish/2, publish_delivered/2, set_queue_ram_duration_target/2, remeasure_egress_rate/1, - ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, - maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, - tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, - tx_commit_from_vq/1, needs_sync/1, full_flush_journal/1, status/1]). + ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, purge/1, delete/1, + requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, + tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, + full_flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -56,7 +56,6 @@ egress_rate, avg_egress_rate, egress_rate_timestamp, - prefetcher, len, on_sync }). @@ -116,7 +115,6 @@ egress_rate :: float(), avg_egress_rate :: float(), egress_rate_timestamp :: {integer(), integer(), integer()}, - prefetcher :: ('undefined' | pid()), len :: non_neg_integer(), on_sync :: {[ack()], [msg_id()], [{pid(), any()}]} }). @@ -137,7 +135,6 @@ -spec(ack/2 :: ([ack()], vqstate()) -> vqstate()). -spec(len/1 :: (vqstate()) -> non_neg_integer()). -spec(is_empty/1 :: (vqstate()) -> boolean()). --spec(maybe_start_prefetcher/1 :: (vqstate()) -> vqstate()). -spec(purge/1 :: (vqstate()) -> {non_neg_integer(), vqstate()}). -spec(delete/1 :: (vqstate()) -> vqstate()). -spec(requeue/2 :: ([{basic_message(), ack()}], vqstate()) -> vqstate()). @@ -181,7 +178,6 @@ init(QueueName) -> egress_rate = 0, avg_egress_rate = 0, egress_rate_timestamp = now(), - prefetcher = undefined, len = GammaCount, on_sync = {[], [], []} }, @@ -221,13 +217,10 @@ set_queue_ram_duration_target( end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, - if TargetRamMsgCount == TargetRamMsgCount1 -> - State1; - TargetRamMsgCount1 == undefined orelse - TargetRamMsgCount < TargetRamMsgCount1 -> - maybe_start_prefetcher(State1); - true -> - reduce_memory_use(State1) + case TargetRamMsgCount1 == undefined orelse + TargetRamMsgCount1 >= TargetRamMsgCount of + true -> State1; + false -> reduce_memory_use(State1) end. remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, @@ -258,14 +251,11 @@ ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, end. fetch(State = - #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, - out_counter = OutCount, prefetcher = Prefetcher, + #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, index_state = IndexState, len = Len }) -> case queue:out(Q4) of - {empty, _Q4} when Prefetcher == undefined -> - fetch_from_q3_or_gamma(State); {empty, _Q4} -> - fetch(drain_prefetcher(drain, State)); + fetch_from_q3_or_gamma(State); {{value, #alpha { msg = Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, @@ -333,45 +323,12 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). -maybe_start_prefetcher(State = #vqstate { target_ram_msg_count = 0 }) -> - State; -maybe_start_prefetcher(State = #vqstate { prefetcher = undefined }) -> - %% ensure we have as much index in RAM as we can - State1 = #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount, - q1 = Q1, q3 = Q3 } = maybe_gammas_to_betas(State), - case queue:is_empty(Q3) of - true -> %% nothing to do - State1; - false -> - %% prefetched content takes priority over q1 - AvailableSpace = - case TargetRamMsgCount of - undefined -> queue:len(Q3); - _ -> (TargetRamMsgCount - RamMsgCount) + queue:len(Q1) - end, - PrefetchCount = lists:min([queue:len(Q3), AvailableSpace]), - case PrefetchCount =< 0 of - true -> State1; - false -> - {PrefetchQueue, Q3a} = queue:split(PrefetchCount, Q3), - {ok, Prefetcher} = - rabbit_queue_prefetcher:start_link(PrefetchQueue), - State1 #vqstate { q3 = Q3a, prefetcher = Prefetcher } - end - end; -maybe_start_prefetcher(State) -> - State. - -purge(State = #vqstate { prefetcher = undefined, q4 = Q4, - index_state = IndexState, len = Len }) -> +purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = remove_queue_entries(Q4, IndexState), {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), - {Len, State1 #vqstate { len = 0 }}; -purge(State) -> - purge(drain_prefetcher(stop, State)). + {Len, State1 #vqstate { len = 0 }}. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -490,7 +447,7 @@ full_flush_journal(State = #vqstate { index_state = IndexState }) -> status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, prefetcher = Prefetcher, + ram_msg_count = RamMsgCount, avg_egress_rate = AvgEgressRate }) -> [ {q1, queue:len(Q1)}, {q2, queue:len(Q2)}, @@ -501,8 +458,7 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {outstanding_txns, length(From)}, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, - {avg_egress_rate, AvgEgressRate}, - {prefetching, Prefetcher /= undefined} ]. + {avg_egress_rate, AvgEgressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -660,56 +616,13 @@ fetch_from_q3_or_gamma(State = #vqstate { fetch(State2) end. -drain_prefetcher(_DrainOrStop, State = #vqstate { prefetcher = undefined }) -> - State; -drain_prefetcher(DrainOrStop, - State = #vqstate { prefetcher = Prefetcher, q1 = Q1, q2 = Q2, - gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4, - ram_msg_count = RamMsgCount }) -> - Fun = case DrainOrStop of - drain -> fun rabbit_queue_prefetcher:drain/1; - stop -> fun rabbit_queue_prefetcher:drain_and_stop/1 - end, - {Q3a, Q4a, Prefetcher1, RamMsgCountAdj} = - case Fun(Prefetcher) of - {empty, Betas} -> %% drain or drain_and_stop - {queue:join(Betas, Q3), Q4, undefined, 0}; - {finished, Alphas} -> %% just drain - {Q3, queue:join(Q4, Alphas), undefined, queue:len(Alphas)}; - {continuing, Alphas} -> %% just drain - {Q3, queue:join(Q4, Alphas), Prefetcher, queue:len(Alphas)}; - {Alphas, Betas} -> %% just drain_and_stop - {queue:join(Betas, Q3), queue:join(Q4, Alphas), undefined, - queue:len(Alphas)} - end, - State1 = State #vqstate { prefetcher = Prefetcher1, q3 = Q3a, q4 = Q4a, - ram_msg_count = RamMsgCount + RamMsgCountAdj }, - %% don't join up with q1/q2 unless the prefetcher has stopped - State2 = case GammaCount == 0 andalso Prefetcher1 == undefined of - true -> case queue:is_empty(Q3a) andalso queue:is_empty(Q2) of - true -> - State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; - false -> - State1 #vqstate { q3 = queue:join(Q3a, Q2) } - end; - false -> State1 - end, - maybe_push_q1_to_betas(State2). - reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; reduce_memory_use(State = #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> - %% strictly, it's not necessary to stop the prefetcher this early, - %% but because of its potential effect on q1 and the - %% ram_msg_count, it's just much simpler to stop it sooner and - %% relaunch when we next hibernate. - State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas( - drain_prefetcher(stop, State))), + State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), case TargetRamMsgCount of 0 -> push_betas_to_gammas(State1); _ -> State1 @@ -810,9 +723,9 @@ publish(neither, Msg = #basic_message { guid = MsgId, store_alpha_entry(Entry = #alpha {}, State = #vqstate { q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4, prefetcher = Prefetcher }) -> + q3 = Q3, q4 = Q4 }) -> case queue:is_empty(Q2) andalso GammaCount == 0 andalso - queue:is_empty(Q3) andalso Prefetcher == undefined of + queue:is_empty(Q3) of true -> State #vqstate { q4 = queue:in(Entry, Q4) }; false -> |
