diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-02 17:53:05 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-02 17:53:05 +0100 |
| commit | 96dcf1e348c19b16c0bb11e89ec46a4fa84038fd (patch) | |
| tree | 018d79db237ab67b92fed1977c8393e99b62c6e5 | |
| parent | e933eecec7be89f0542213cd6ad0424880c078f4 (diff) | |
| download | rabbitmq-server-git-96dcf1e348c19b16c0bb11e89ec46a4fa84038fd.tar.gz | |
Added documentation. Also discovered that opening a file in append mode seems to be slower than opening it read/write and seeking to the end. However, further tuning delayed until after the file handle cache appears
| -rw-r--r-- | src/rabbit_queue_index.erl | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index d887a7700c..3158a1b3a7 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -34,6 +34,43 @@ -export([init/1, write_published/4, write_delivered/2, write_acks/2, flush_journal/1, read_segment_entries/2]). +%%---------------------------------------------------------------------------- +%% The queue disk index +%% +%% The queue disk index operates over an ack journal, and a number of +%% segment files. Each segment is the same size, both in max number of +%% entries, and max file size, owing to fixed sized records. +%% +%% Publishes and delivery notes are written directly to the segment +%% files. The segment is found by dividing the sequence id by the the +%% max number of entries per segment. Only the relative sequeuence +%% within the segment is recorded as the sequence id within a segment +%% file (i.e. sequeuence id modulo max number of entries per +%% segment). This is keeps entries as small as possible. Publishes and +%% deliveries are only ever going to be received in contiguous +%% ascending order, with publishes following the tail of the queue and +%% deliveries following the head of the queue. +%% +%% Acks are written to a bounded journal and are also held in memory, +%% in a dict with the segment file as the key. When the journal gets +%% too big, or flush_journal is called, the journal is (possibly +%% incrementally) flushed out to the segment files. As acks can be +%% received from any delivered message in any order, this journal +%% reduces seeking, and batches writes to the segment files, keeping +%% performance high. The flush_journal/1 function returns a boolean +%% indicating whether there is more flushing work that can be +%% done. This means that the process can call this whenever it has an +%% empty mailbox, only a small amount of work is done, allowing the +%% process to respond quickly to new messages if they arrive, or to +%% call flush_journal/1 several times until the result indicates there +%% is no more flushing to be done. +%% +%% On startup, the ack journal is read along with all the segment +%% files, and the ack journal is fully flushed out to the segment +%% files. Care is taken to ensure that no message can be ack'd twice. +%% +%%---------------------------------------------------------------------------- + -define(MAX_ACK_JOURNAL_ENTRY_COUNT, 32768). -define(ACK_JOURNAL_FILENAME, "ack_journal.jif"). -define(SEQ_BYTES, 8). @@ -101,6 +138,9 @@ -spec(write_delivered/2 :: (seq_id(), qistate()) -> qistate()). -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}). +-spec(read_segment_entries/2 :: (seq_id(), qistate()) -> + {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(), + 'on_disk'}], qistate()}). -endif. @@ -223,7 +263,8 @@ get_file_handle_for_seg(SegNum, State = #qistate { cur_seg_num = CurSegNum }) -> State1 = #qistate { dir = Dir } = close_file_handle_for_seg(CurSegNum, State), {ok, Hdl} = file:open(seg_num_to_path(Dir, SegNum), - [binary, raw, append, delayed_write]), + [binary, raw, write, delayed_write, read]), + {ok, _} = file:position(Hdl, {eof, 0}), {Hdl, State1 #qistate { cur_seg_num = SegNum, cur_seg_hdl = Hdl}}. bool_to_int(true ) -> 1; @@ -368,7 +409,8 @@ append_acks_to_segment(SegPath, AckCount, Acks) ?SEGMENT_ENTRIES_COUNT; append_acks_to_segment(SegPath, AckCount, Acks) when length(Acks) + AckCount < ?SEGMENT_ENTRIES_COUNT -> - {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, append]), + {ok, Hdl} = file:open(SegPath, [raw, binary, delayed_write, write, read]), + {ok, _} = file:position(Hdl, {eof, 0}), AckCount1 = lists:foldl( fun (RelSeq, AckCount2) -> |
