summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-02 17:53:05 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-02 17:53:05 +0100
commit96dcf1e348c19b16c0bb11e89ec46a4fa84038fd (patch)
tree018d79db237ab67b92fed1977c8393e99b62c6e5
parente933eecec7be89f0542213cd6ad0424880c078f4 (diff)
downloadrabbitmq-server-git-96dcf1e348c19b16c0bb11e89ec46a4fa84038fd.tar.gz
Added documentation. Also discovered that opening a file in append mode seems to be slower than opening it read/write and seeking to the end. However, further tuning delayed until after the file handle cache appears
-rw-r--r--src/rabbit_queue_index.erl46
1 files changed, 44 insertions, 2 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index d887a7700c..3158a1b3a7 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -34,6 +34,43 @@
-export([init/1, write_published/4, write_delivered/2, write_acks/2,
flush_journal/1, read_segment_entries/2]).
+%%----------------------------------------------------------------------------
+%% The queue disk index
+%%
+%% The queue disk index operates over an ack journal, and a number of
+%% segment files. Each segment is the same size, both in max number of
+%% entries, and max file size, owing to fixed sized records.
+%%
+%% Publishes and delivery notes are written directly to the segment
+%% files. The segment is found by dividing the sequence id by the the
+%% max number of entries per segment. Only the relative sequeuence
+%% within the segment is recorded as the sequence id within a segment
+%% file (i.e. sequeuence id modulo max number of entries per
+%% segment). This is keeps entries as small as possible. Publishes and
+%% deliveries are only ever going to be received in contiguous
+%% ascending order, with publishes following the tail of the queue and
+%% deliveries following the head of the queue.
+%%
+%% Acks are written to a bounded journal and are also held in memory,
+%% in a dict with the segment file as the key. When the journal gets
+%% too big, or flush_journal is called, the journal is (possibly
+%% incrementally) flushed out to the segment files. As acks can be
+%% received from any delivered message in any order, this journal
+%% reduces seeking, and batches writes to the segment files, keeping
+%% performance high. The flush_journal/1 function returns a boolean
+%% indicating whether there is more flushing work that can be
+%% done. This means that the process can call this whenever it has an
+%% empty mailbox, only a small amount of work is done, allowing the
+%% process to respond quickly to new messages if they arrive, or to
+%% call flush_journal/1 several times until the result indicates there
+%% is no more flushing to be done.
+%%
+%% On startup, the ack journal is read along with all the segment
+%% files, and the ack journal is fully flushed out to the segment
+%% files. Care is taken to ensure that no message can be ack'd twice.
+%%
+%%----------------------------------------------------------------------------
+
-define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768).
-define(ACK_JOURNAL_FILENAME, "ack_journal.jif").
-define(SEQ_BYTES, 8).
@@ -101,6 +138,9 @@
-spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()).
-spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}).
+-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
+ {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(),
+ 'on_disk'}], qistate()}).
-endif.
@@ -223,7 +263,8 @@ get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) ->
State1 = #qistate { dir = Dir } =
close_file_handle_for_seg(CurSegNum, State),
{ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum),
- [binary, raw, append, delayed_write]),
+ [binary, raw, write, delayed_write, read]),
+ {ok, _} = file:position(Hdl, {eof, 0}),
{Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}.
bool_to_int(true ) -> 1;
@@ -368,7 +409,8 @@ append_acks_to_segment(SegPath, AckCount, Acks)
?SEGMENT_ENTRIES_COUNT;
append_acks_to_segment(SegPath, AckCount, Acks)
when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT ->
- {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, append]),
+ {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, write, read]),
+ {ok, _} = file:position(Hdl, {eof, 0}),
AckCount1 =
lists:foldl(
fun (RelSeq, AckCount2) ->