diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-02 13:04:46 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-02 13:04:46 +0000 |
| commit | 6f18886520e8e7356c1efd84a2e1cc6aa66624f6 (patch) | |
| tree | 62653bdeb2ed9071fdb87c3fc0e23f749a7ee0f4 | |
| parent | f8b9b92c6a33c7da267fe3b734194274c78458ea (diff) | |
| download | rabbitmq-server-git-6f18886520e8e7356c1efd84a2e1cc6aa66624f6.tar.gz | |
Lots of good progress on qi3. The code almost looks pretty in places
| -rw-r--r-- | src/rabbit_queue_index3.erl | 413 |
1 files changed, 377 insertions, 36 deletions
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl index 9a9a9c7862..eeb38dd28a 100644 --- a/src/rabbit_queue_index3.erl +++ b/src/rabbit_queue_index3.erl @@ -31,6 +31,337 @@ -module(rabbit_queue_index3). + +-define(CLEAN_FILENAME, "clean.dot"). + +%% ---- Journal details ---- + +-define(MAX_JOURNAL_ENTRY_COUNT, 32768). +-define(JOURNAL_FILENAME, "journal.jif"). + +-define(PUB_PERSIST_JPREFIX, 00). +-define(PUB_TRANS_JPREFIX, 01). +-define(DEL_JPREFIX, 10). +-define(ACK_JPREFIX, 11). +-define(JPREFIX_BITS, 2). +-define(SEQ_BYTES, 8). +-define(SEQ_BITS, ((?SEQ_BYTES * 8) - ?JPREFIX_BITS)). + +%% ---- Segment details ---- + +-define(SEGMENT_EXTENSION, ".idx"). + +-define(REL_SEQ_BITS, 14). +-define(REL_SEQ_BITS_BYTE_ALIGNED, (?REL_SEQ_BITS + 8 - (?REL_SEQ_BITS rem 8))). +-define(SEGMENT_ENTRY_COUNT, 16384). %% trunc(math:pow(2,?REL_SEQ_BITS))). + +%% seq only is binary 00 followed by 14 bits of rel seq id +%% (range: 0 - 16383) +-define(REL_SEQ_ONLY_PREFIX, 00). +-define(REL_SEQ_ONLY_PREFIX_BITS, 2). +-define(REL_SEQ_ONLY_ENTRY_LENGTH_BYTES, 2). + +%% publish record is binary 1 followed by a bit for is_persistent, +%% then 14 bits of rel seq id, and 128 bits of md5sum msg id +-define(PUBLISH_PREFIX, 1). +-define(PUBLISH_PREFIX_BITS, 1). + +-define(MSG_ID_BYTES, 16). %% md5sum is 128 bit or 16 bytes +-define(MSG_ID_BITS, (?MSG_ID_BYTES * 8)). +%% 16 bytes for md5sum + 2 for seq, bits and prefix +-define(PUBLISH_RECORD_LENGTH_BYTES, ?MSG_ID_BYTES + 2). + +%% 1 publish, 1 deliver, 1 ack per msg +-define(SEGMENT_TOTAL_SIZE, ?SEGMENT_ENTRY_COUNT * + (?PUBLISH_RECORD_LENGTH_BYTES + + (2 * ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES))). + +%%---------------------------------------------------------------------------- + +-record(qistate, + { dir, + segments, + journal_handle, + dirty_count + }). + +-record(segment, + { pubs, + acks, + handle, + journal_entries, + path, + num + }). + +-include("rabbit.hrl"). + +%%---------------------------------------------------------------------------- + +terminate(State = #qistate { segments = Segments, journal_handle = JournalHdl, + dir = Dir }) -> + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + ok = dict:fold( + fun (_Seg, #segment { handle = undefined }, ok) -> + ok; + (_Seg, #segment { handle = Hdl }, ok) -> + file_handle_cache:close(Hdl) + end, ok, Segments), + store_clean_shutdown(Dir), + State #qistate { journal_handle = undefined, segments = dict:new() }. + +terminate_and_erase(State) -> + State1 = terminate(State), + ok = delete_queue_directory(State1 #qistate.dir), + State1. + +%%---------------------------------------------------------------------------- +%% Minors +%%---------------------------------------------------------------------------- + +rev_sort(List) -> + lists:sort(fun (A, B) -> B < A end, List). + +seq_id_to_seg_and_rel_seq_id(SeqId) -> + { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. + +reconstruct_seq_id(SegNum, RelSeq) -> + (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq. + +seg_num_to_path(Dir, SegNum) -> + SegName = integer_to_list(SegNum), + filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). + +delete_segment(#segment { handle = undefined }) -> + ok; +delete_segment(#segment { handle = Hdl, path = Path }) -> + ok = file_handle_cache:close(Hdl), + ok = file:delete(Path), + ok. + +detect_clean_shutdown(Dir) -> + case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of + ok -> true; + {error, enoent} -> false + end. + +store_clean_shutdown(Dir) -> + {ok, Hdl} = file_handle_cache:open(filename:join(Dir, ?CLEAN_FILENAME), + [write, raw, binary], + [{write_buffer, unbuffered}]), + ok = file_handle_cache:close(Hdl). + +queue_name_to_dir_name(Name = #resource { kind = queue }) -> + Bin = term_to_binary(Name), + Size = 8*size(Bin), + <<Num:Size>> = Bin, + lists:flatten(io_lib:format("~.36B", [Num])). + +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). + +delete_queue_directory(Dir) -> + {ok, Entries} = file:list_dir(Dir), + ok = lists:foldl(fun (Entry, ok) -> + file:delete(filename:join(Dir, Entry)) + end, ok, Entries), + ok = file:del_dir(Dir). + +get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, Segment #segment { handle = Hdl }}; +get_segment_handle(Segment = #segment { handle = Hdl }) -> + {Hdl, Segment}. + +find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> + case dict:find(Seg, Segments) of + {ok, Segment = #segment{}} -> Segment; + error -> #segment { pubs = 0, + acks = 0, + handle = undefined, + journal_entries = dict:new(), + path = seg_num_to_path(Dir, Seg), + num = Seg + } + end. + +store_segment(Segment = #segment { num = Seg }, + State = #qistate { segments = Segments }) -> + State #qistate { segments = dict:store(Seg, Segment, Segments) }. + +get_journal_handle(State = + #qistate { journal_handle = undefined, dir = Dir }) -> + Path = filename:join(Dir, ?JOURNAL_FILENAME), + {ok, Hdl} = file_handle_cache:open(Path, + [binary, raw, read, write, + {read_ahead, ?SEGMENT_TOTAL_SIZE}], + [{write_buffer, infinity}]), + {Hdl, State #qistate { journal_handle = Hdl }}; +get_journal_handle(State = #qistate { journal_handle = Hdl }) -> + {Hdl, State}. + +%%---------------------------------------------------------------------------- +%% Majors +%%---------------------------------------------------------------------------- + +%% Loading segments + +%% Does not do any combining with the journal at all +load_segment(Seg, KeepAcks, State) -> + Segment = #segment { path = Path, handle = SegHdl } = + find_segment(Seg, State), + SegmentExists = case SegHdl of + undefined -> filelib:is_file(Path); + _ -> true + end, + case SegmentExists of + false -> + {dict:new(), 0, 0, State}; + true -> + {Hdl, Segment1} = get_segment_handle(Segment), + {ok, 0} = file_handle_cache:position(Hdl, bof), + {SegDict, PubCount, AckCount} = + load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), + {SegDict, PubCount, AckCount, store_segment(Segment1, State)} + end. + +load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> + case file_handle_cache:read(Hdl, 1) of + {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + MSB:(8-?REL_SEQ_ONLY_PREFIX_BITS)>>} -> + {ok, LSB} = file_handle_cache:read( + Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES - 1), + <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, + {AckCount1, SegDict1} = + deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount, AckCount1); + {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + IsPersistentNum:1, MSB:(7-?PUBLISH_PREFIX_BITS)>>} -> + %% because we specify /binary, and binaries are complete + %% bytes, the size spec is in bytes, not bits. + {ok, <<LSB:1/binary, MsgId:?MSG_ID_BYTES/binary>>} = + file_handle_cache:read( + Hdl, ?PUBLISH_RECORD_LENGTH_BYTES - 1), + <<RelSeq:?REL_SEQ_BITS_BYTE_ALIGNED>> = <<MSB, LSB/binary>>, + SegDict1 = + dict:store(RelSeq, + {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, + SegDict), + load_segment_entries(KeepAcks, Hdl, SegDict1, PubCount+1, AckCount); + _ErrOrEoF -> + {SegDict, PubCount, AckCount} + end. + +deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> + case dict:find(RelSeq, SegDict) of + {ok, {PubRecord, no_del, no_ack}} -> + {AckCount, dict:store(RelSeq, {PubRecord, del, no_ack}, SegDict)}; + {ok, {PubRecord, del, no_ack}} when KeepAcks -> + {AckCount + 1, dict:store(RelSeq, {PubRecord, del, ack}, SegDict)}; + {ok, {_PubRecord, del, no_ack}} when KeepAcks -> + {AckCount + 1, dict:erase(RelSeq, SegDict)} + end. + +%% Loading Journal. This isn't idempotent and will mess up the counts +%% if you call it more than once on the same state. Assumes the counts +%% are 0 to start with. + +load_journal(State) -> + {JournalHdl, State1} = get_journal_handle(State), + {ok, 0} = file_handle_cache:position(JournalHdl, 0), + State1 = #qistate { segments = Segments } = load_journal_entries(State), + dict:fold( + fun (Seg, #segment { journal_entries = JEntries, + pubs = PubCountInJournal, + acks = AckCountInJournal }, StateN) -> + %% We want to keep acks in so that we can remove them if + %% duplicates are in the journal. The counts here are + %% purely from the segment itself. + {SegDict, PubCount, AckCount, StateN1} = + load_segment(Seg, true, StateN), + %% Removed counts here are the number of pubs and acks + %% that are duplicates - i.e. found in both the segment + %% and journal. + {JEntries1, PubsRemoved, AcksRemoved} = + journal_minus_segment(JEntries, SegDict), + {Segment1, StateN2} = find_segment(Seg, StateN1), + PubCount1 = PubCount + PubCountInJournal - PubsRemoved, + AckCount1 = AckCount + AckCountInJournal - AcksRemoved, + store_segment(Segment1 #segment { journal_entries = JEntries1, + pubs = PubCount1, + acks = AckCount1 }, StateN2) + end, State1, Segments). + +load_journal_entries(State = #qistate { journal_handle = Hdl }) -> + case file_handle_cache:read(Hdl, ?SEQ_BYTES) of + {ok, <<Prefix:?JPREFIX_BITS, SeqId:?SEQ_BITS>>} -> + case Prefix of + ?DEL_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, del, State)); + ?ACK_JPREFIX -> + load_journal_entries(add_to_journal(SeqId, ack, State)); + _ -> + case file_handle_cache:read(Hdl, ?MSG_ID_BYTES) of + {ok, <<MsgIdNum:?MSG_ID_BITS>>} -> + %% work around for binary data + %% fragmentation. See + %% rabbit_msg_file:read_next/2 + <<MsgId:?MSG_ID_BYTES/binary>> = + <<MsgIdNum:?MSG_ID_BITS>>, + Publish = {MsgId, + case Prefix of + ?PUB_PERSIST_JPREFIX -> true; + ?PUB_TRANS_JPREFIX -> false + end}, + load_journal_entries( + add_to_journal(SeqId, Publish, State)); + _ErrOrEoF -> %% err, we've lost at least a publish + State + end + end; + _ErrOrEoF -> State + end. + +add_to_journal(SeqId, Action, State = #qistate {}) -> + {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + Segment = #segment { journal_entries = SegJDict, + pubs = PubCount, acks = AckCount } = + find_segment(Seg, State), + SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), + Segment1 = Segment #segment { journal_entries = SegJDict1 }, + Segment2 = + case Action of + del -> Segment1; + ack -> Segment1 #segment { acks = AckCount + 1 }; + {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + end, + store_segment(Segment2, State); + +%% This is a more relaxed version of deliver_or_ack_msg because we can +%% have dels or acks in the journal without the corresponding +%% pub. Also, always want to keep acks. Things must occur in the right +%% order though. +add_to_journal(RelSeq, Action, SegJDict) -> + case dict:find(RelSeq, SegJDict) of + {ok, {PubRecord, no_del, no_ack}} when Action == del -> + dict:store(RelSeq, {PubRecord, del, no_ack}, SegJDict); + {ok, {PubRecord, DelRecord, no_ack}} when Action == ack -> + dict:store(RelSeq, {PubRecord, DelRecord, ack}, SegJDict); + error when Action == del -> + dict:store(RelSeq, {no_pub, del, no_ack}, SegJDict); + error when Action == ack -> + dict:store(RelSeq, {no_pub, no_del, ack}, SegJDict); + error -> + {_MsgId, _IsPersistent} = Action, %% ASSERTION + dict:store(RelSeq, {Action, no_del, no_ack}, SegJDict) + end. + %% Combine what we have just read from a segment file with what we're %% holding for that segment in memory. There must be no %% duplicates. Used when providing segment entries to the variable @@ -55,10 +386,10 @@ journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, not_found, RelSeq, OutDict) -> dict:store(RelSeq, Obj, OutDict); -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, ack}, +journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, not_found, RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + dict:erase(RelSeq, OutDict); journal_plus_segment({no_pub, del, no_ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, @@ -79,81 +410,91 @@ journal_plus_segment({no_pub, no_del, ack}, %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. journal_minus_segment(JEntries, SegDict) -> - dict:fold(fun (RelSeq, JObj, JEntriesOut) -> + dict:fold(fun (RelSeq, JObj, {JEntriesOut, PubsRemoved, AcksRemoved}) -> SegEntry = case dict:find(RelSeq, SegDict) of error -> not_found; {ok, SObj = {_, _, _}} -> SObj end, - journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut) - end, dict:new(), JEntries). + journal_minus_segment(JObj, SegEntry, RelSeq, JEntriesOut, + PubsRemoved, AcksRemoved) + end, {dict:new(), 0, 0}, JEntries). %% Here, the OutDict is a fresh journal that we're filling with valid -%% entries. -%% Both the same -journal_minus_segment(_RelSeq, Obj, Obj, OutDict) -> - OutDict; +%% entries. PubsRemoved and AcksRemoved only get increased when the a +%% publish or ack is in both the journal and the segment. + +%% Both the same. Must be at least the publish +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved}; +journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved + 1, AcksRemoved + 1}; %% Just publish in journal journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; %% Just deliver in journal journal_minus_segment(Obj = {no_pub, del, no_ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, no_ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - _RelSeq, OutDict) -> - OutDict; + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; %% Just ack in journal journal_minus_segment(Obj = {no_pub, no_del, ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, no_del, ack}, {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict) -> - OutDict; + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; %% Publish and deliver in journal journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, not_found, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; journal_minus_segment({PubRecord, del, no_ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {no_pub, del, no_ack}, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, no_ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; %% Deliver and ack in journal journal_minus_segment(Obj = {no_pub, del, ack}, {{_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, Obj, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, Obj, OutDict), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {no_pub, no_del, ack}, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, {{_MsgId, _IsPersistent}, del, ack}, - _RelSeq, OutDict) -> - OutDict; + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved + 1}; %% Publish, deliver and ack in journal journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, not_found, - _RelSeq, OutDict) -> - OutDict; + _RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {OutDict, PubsRemoved, AcksRemoved}; journal_minus_segment({PubRecord, del, ack}, {PubRecord = {_MsgId, _IsPersistent}, no_del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {no_pub, del, ack}, OutDict); + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}; journal_minus_segment({PubRecord, del, ack}, {PubRecord = {_MsgId, _IsPersistent}, del, no_ack}, - RelSeq, OutDict) -> - dict:store(RelSeq, {no_pub, no_del, ack}, OutDict). - + RelSeq, OutDict, PubsRemoved, AcksRemoved) -> + {dict:store(RelSeq, {no_pub, no_del, ack}, OutDict), + PubsRemoved + 1, AcksRemoved}. |
