summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-21 12:23:42 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-21 12:23:42 +0100
commitfbb198bfdebf3cecd7469821d61a7261cc8f3ca2 (patch)
tree47bfd2d4503d6bf8609a1914f66eef5dda76120f /src
parent83d676cb1221e86bcaec55e3291262b7fc88627f (diff)
downloadrabbitmq-server-git-fbb198bfdebf3cecd7469821d61a7261cc8f3ca2.tar.gz
bare non-functioning skeleton of prefetcher. Essay written on design of prefetcher and its limitations.
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl2
-rw-r--r--src/rabbit_mixed_queue.erl2
-rw-r--r--src/rabbit_queue_prefetcher.erl204
3 files changed, 206 insertions, 2 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index e46f2645bd..6d8d2ff6e1 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -153,7 +153,7 @@
%%%=========================================================================
-ifdef(use_specs).
--spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}].
+-spec behaviour_info(atom()) -> 'undefined' | [{atom(), any()}].
-endif.
behaviour_info(callbacks) ->
diff --git a/src/rabbit_mixed_queue.erl b/src/rabbit_mixed_queue.erl
index 3c2f99e670..fedc0e523f 100644
--- a/src/rabbit_mixed_queue.erl
+++ b/src/rabbit_mixed_queue.erl
@@ -164,7 +164,7 @@ send_messages_to_disk(IsDurable, Q, Queue, PublishCount, RequeueCount,
false ->
Commit1 = flush_requeue_to_disk_queue
(Q, RequeueCount, Commit),
- ok = rabbit_disk_queue:tx_publish(Msg),
+ ok = rabbit_disk_queue:tx_publish(Msg), %% TODO - this is resetting the delivered flag to false! (well, actually, in the commit, but nevertheless, it's wrong)
case PublishCount == ?TO_DISK_MAX_FLUSH_SIZE of
true ->
ok = flush_messages_to_disk_queue(Q, Commit1),
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
new file mode 100644
index 0000000000..79624f3de5
--- /dev/null
+++ b/src/rabbit_queue_prefetcher.erl
@@ -0,0 +1,204 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developers of the Original Code are LShift Ltd,
+%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
+%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
+%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
+%% Technologies LLC, and Rabbit Technologies Ltd.
+%%
+%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
+%% Ltd. Portions created by Cohesive Financial Technologies LLC are
+%% Copyright (C) 2007-2009 Cohesive Financial Technologies
+%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
+%% (C) 2007-2009 Rabbit Technologies Ltd.
+%%
+%% All Rights Reserved.
+%%
+%% Contributor(s): ______________________________________.
+%%
+
+-module(rabbit_queue_prefetcher).
+
+-behaviour(gen_server2).
+
+-export([start_link/0]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-define(HIBERNATE_AFTER_MIN, 1000).
+
+-record(pstate,
+ { msg_buf,
+ buf_length,
+ target_count,
+ fetched_count,
+ queue
+ }).
+
+%% The design of the prefetcher is based on the following:
+%%
+%% a) It must issue low-priority (-ve) requests to the disk queue for
+%% the next message.
+%% b) If the prefetcher is empty and the amqqueue_process
+%% (mixed_queue) asks it for a message, it must exit immediately,
+%% telling the mixed_queue that it is empty so that the mixed_queue
+%% can then take the more efficient path and communicate with the
+%% disk_queue directly
+%% c) No message can accidentally be delivered twice, or lost
+%% d) The prefetcher must only cause load when the disk_queue is
+%% otherwise idle, and must not worsen performance in a loaded
+%% situation.
+%%
+%% As such, it's a little tricky. It must never issue a call to the
+%% disk_queue - if it did, then that could potentially block, thus
+%% causing pain to the mixed_queue that needs fast answers as to
+%% whether the prefetcher has prefetched content or not. It behaves as
+%% follows:
+%%
+%% 1) disk_queue:prefetch(Q)
+%% This is a low priority cast
+%%
+%% 2) The disk_queue may pick up the cast, at which point it'll read
+%% the next message invoke prefetcher:publish(Msg). Normal priority
+%% cast. Note that in the mean time, the mixed_queue could have
+%% come along, found the prefetcher empty, asked it to exit. This
+%% means the effective "reply" from the disk_queue will go no
+%% where. As a result, the disk_queue must perform no modification
+%% to the status of the message *or the queue* - do not mark the
+%% message delivered, and do not advance the queue. If it did
+%% advance the queue and the msg was then lost, then the queue
+%% would have lost a msg that the mixed_queue would not pick up.
+%%
+%% 3) The prefetcher hopefully receives the cast from
+%% prefetcher:publish(Msg). It then adds to its internal queue and
+%% calls disk_queue:set_delivered_and_advance(Q) which is a normal
+%% priority cast. This cannot be low-priority because if it was,
+%% the mixed_queue could come along, drain the prefetcher, thus
+%% catching the msg just sent by the disk_queue and then call
+%% disk_queue:deliver(Q) which is normal priority call, which could
+%% overtake the low-priority
+%% disk_queue:set_delivered_and_advance(Q) cast and thus result in
+%% the same msg being delivered by the queue twice.
+%%
+%% 4) The disk_queue receives the set_delivered_and_advance(Q) cast,
+%% marks the msg at the head of the queue Q as delivered, and advances
+%% the Q to the next msg.
+%%
+%% 5) If the prefetcher has not met its target then it goes back to
+%% 1). Otherwise it just sits and waits for the mixed_queue to
+%% drain it.
+%%
+%% Now at some point, the mixed_queue will come along and will call
+%% prefetcher:drain(). Normal priority call. The prefetcher then
+%% replies with its internal queue and the length of that queue. If
+%% the prefetch target was reached, the prefetcher stops normally at
+%% this point. If it hasn't been reached, then the prefetcher
+%% continues to hang around (it almost certainly has issued a
+%% disk_queue:prefetch(Q) cast and is waiting for a reply from the
+%% disk_queue).
+%%
+%% If the mixed_queue calls prefetcher:drain() and the prefetcher's
+%% internal queue is empty then the prefetcher replies with 'empty',
+%% and it exits. This informs the mixed_queue that it should from now
+%% on talk directly with the disk_queue and not via the
+%% prefetcher. This is more efficient and the mixed_queue will use
+%% normal priority blocking calls to the disk_queue and thus get
+%% better service that way. When exiting in this way, two situations
+%% could occur:
+%%
+%% 1) The prefetcher has issued a disk_queue:prefetch(Q) which has not
+%% yet been picked up by the disk_queue. This msg won't go away and
+%% the disk_queue will eventually find it. However, when it does,
+%% it'll simply read the next message from the queue (which could now
+%% be empty), possibly populate the cache (no harm done) and try and
+%% call prefetcher:publish(Msg) which will go no where. However, the
+%% state of the queue and the state of the message has not been
+%% altered so the mixed_queue will be able to fetch this message as if
+%% it had never been prefetched.
+%%
+%% 2) The disk_queue has already picked up the disk_queue:prefetch(Q)
+%% low priority message and has read the message and replied, by
+%% calling prefetcher:publish(Msg). In fact, it's possible that
+%% message is directly behind the call from mixed_queue to
+%% prefetcher:drain(). Same reasoning as in 1) applies - neither the
+%% queue's nor the message's state have been altered, so the
+%% mixed_queue can absolutely go and fetch the message again.
+%%
+%% The only point at which the queue is advanced and the message
+%% marked as delivered is when the prefetcher calls
+%% disk_queue:set_delivered_and_advance(Q). At this point the message
+%% has been received by the prefetcher and so we guarantee it will be
+%% passed to the mixed_queue when the mixed_queue tries to drain the
+%% prefetcher. We must therefore ensure that this msg can't also be
+%% delivered to the mixed_queue directly by the disk_queue through the
+%% mixed_queue calling disk_queue:deliver(Q) which is why the
+%% disk_queue:set_delivered_and_advance(Q) cast must be normal
+%% priority (or at least match the priority of disk_queue:deliver(Q)).
+%%
+%% Finally, the prefetcher is only created when the mixed_queue is
+%% operating in mixed mode and it sees that the next N messages are
+%% all on disk. During this phase, the mixed_queue can be asked to go
+%% back to disk_only mode. When this happens, it calls
+%% prefetcher:drain_and_stop() which behaves like two consecutive
+%% calls to drain() - i.e. replies with all prefetched messages and
+%% causes the prefetcher to exit.
+%%
+%% Note there is a flaw here in that we end up marking messages which
+%% have come through the prefetcher as delivered even if they don't
+%% get delivered (e.g. prefetcher fetches them, then broker
+%% dies). However, the alternative is that the mixed_queue must do a
+%% call to the disk_queue when it effectively passes them out to the
+%% rabbit_writer. This would hurt performance, and even at that stage,
+%% we have no guarantee that the message will really go out of the
+%% socket. What we do still have is that messages which have the
+%% redelivered bit set false really are guaranteed to have not been
+%% delivered already. Well, almost: if the disk_queue has a large back
+%% log of messages then the prefetcher invocation of
+%% disk_queue:set_delivered_and_advance(Q) may not be acted upon
+%% before a crash. However, given that the prefetching is operating in
+%% lock-step with the disk_queue, this means that at most, 1 (one)
+%% message can fail to have its delivered flag raised. The alternative
+%% is that disk_queue:set_delivered_and_advance(Q) could be made into
+%% a call. However, if the disk_queue is heavily loaded, this can
+%% block the prefetcher for some time, which in turn can block the
+%% mixed_queue when it wants to drain the prefetcher.
+
+start_link(Queue, Count) ->
+ gen_server2:start_link(?MODULE, [Queue, Count], []).
+
+init([Q, Count]) ->
+ State = #pstate { msg_buf = queue:new(),
+ buf_length = 0,
+ target_count = Count,
+ fetched_count = 0,
+ queue = Q
+ },
+ {ok, State, {binary, ?HIBERNATE_AFTER_MIN}}.
+
+handle_call(_Msg, _From, State) ->
+ {reply, confused, State}.
+
+handle_cast(_Msg, State) ->
+ {noreply, State}.
+
+handle_info(timeout, State) ->
+ {noreply, State, hibernate}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.