diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-02 14:56:32 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-02 14:56:32 +0000 |
| commit | acc6bbb9f6b29f194ddd0e60087489147ea5234d (patch) | |
| tree | 60aa90e7901c527f51c67b5237ccd58fac967ae5 | |
| parent | 6f18886520e8e7356c1efd84a2e1cc6aa66624f6 (diff) | |
| download | rabbitmq-server-git-acc6bbb9f6b29f194ddd0e60087489147ea5234d.tar.gz | |
More good progress on qi3. The code still almost looks pretty in places. Its prettiness is not diminishing.
| -rw-r--r-- | src/rabbit_queue_index3.erl | 182 |
1 files changed, 175 insertions, 7 deletions
diff --git a/src/rabbit_queue_index3.erl b/src/rabbit_queue_index3.erl index eeb38dd28a..01a7c748eb 100644 --- a/src/rabbit_queue_index3.erl +++ b/src/rabbit_queue_index3.erl @@ -31,9 +31,14 @@ -module(rabbit_queue_index3). +-export([init/1, terminate/1, terminate_and_erase/1, write_published/4, + write_delivered/2, write_acks/2, sync_seq_ids/3, flush_journal/1, + read_segment_entries/2, next_segment_boundary/1, segment_size/0, + find_lowest_seq_id_seg_and_next_seq_id/1, start_msg_store/1]). -define(CLEAN_FILENAME, "clean.dot"). +%%---------------------------------------------------------------------------- %% ---- Journal details ---- -define(MAX_JOURNAL_ENTRY_COUNT, 32768). @@ -98,6 +103,61 @@ %%---------------------------------------------------------------------------- +init(Name) -> + State = blank_state(Name), + %% 1. Load the journal completely. This will also load segments + %% which have entries in the journal and remove duplicates. + %% The counts will correctly reflect the combination of the + %% segment and the journal. + State1 = load_journal(State), + %% 2. Flush the journal. This makes life easier for everyone, as + %% it means there won't be any publishes in the journal alone. + State2 = #qistate { dir = Dir } = flush_journal(State1), + %% 3. Load each segment in turn and filter out messages that are + %% not in the msg_store, by adding acks to the journal. These + %% acks only go to the RAM journal as it doesn't matter if we + %% lose them. Also mark delivered if not clean shutdown. + AllSegs = all_segment_nums(Dir), + CleanShutdown = detect_clean_shutdown(Dir), + %% We know the journal is empty here, so we don't need to combine + %% with the journal, and we don't need to worry about messages + %% that have been acked. + State3 = + lists:foldl( + fun (Seg, StateN) -> + {SegDict, _PubCount, _AckCount, StateN1} = + load_segment(Seg, false, StateN), + dict:fold( + fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + StateM) -> + SeqId = reconstruct_seq_id(Seg, RelSeq), + InMsgStore = rabbit_msg_store:contains(MsgId), + case {InMsgStore, CleanShutdown} of + {true, true} -> + StateM; + {true, false} when Del == del -> + StateM; + {true, false} -> + add_to_journal(SeqId, del, StateM); + {false, _} when Del == del -> + add_to_journal(SeqId, ack, StateM); + {false, _} -> + add_to_journal( + SeqId, ack, + add_to_journal(SeqId, del, StateM)) + end + end, StateN1, SegDict) + end, State2, AllSegs), + %% 4. Go through all segments and calculate the number of unacked + %% messages we have. + Count = lists:foldl( + fun (Seg, CountAcc) -> + #segment { pubs = PubCount, acks = AckCount } = + find_segment(Seg, State3), + CountAcc + PubCount - AckCount + end, 0, AllSegs), + {Count, State3}. + terminate(State = #qistate { segments = Segments, journal_handle = JournalHdl, dir = Dir }) -> ok = case JournalHdl of @@ -118,21 +178,100 @@ terminate_and_erase(State) -> ok = delete_queue_directory(State1 #qistate.dir), State1. +flush_journal(State = #qistate { dirty_count = 0 }) -> + State; +flush_journal(State = #qistate { segments = Segments }) -> + dict:fold( + fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, + acks = AckCount } = Segment, StateN) -> + case dict:is_empty(JEntries) of + true -> store_segment(Segment, StateN); + false when AckCount == PubCount -> + ok = delete_segment(Segment); + false -> + {Hdl, Segment1} = get_segment_handle(Segment), + dict:fold(fun write_entry_to_segment/3, + Hdl, JEntries), + ok = file_handle_cache:sync(Hdl), + store_segment( + Segment1 #segment { journal_entries = dict:new() }, + StateN) + end + end, State, Segments). + +read_segment_entries(InitSeqId, State) -> + {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), + {SegDict, _PubCount, _AckCount, State1} = + load_segment(Seg, false, State), + #segment { journal_entries = JEntries } = find_segment(Seg, State1), + SegDict1 = journal_plus_segment(JEntries, SegDict), + %% deliberately sort the list desc, because foldl will reverse it + RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), + {lists:foldl(fun (RelSeq, Acc) -> + {{MsgId, IsPersistent}, IsDelivered, no_ack} = + dict:fetch(RelSeq, SegDict1), + [ {MsgId, reconstruct_seq_id(Seg, RelSeq), + IsPersistent, IsDelivered} | Acc ] + end, [], RelSeqs), + State1}. + +next_segment_boundary(SeqId) -> + {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), + reconstruct_seq_id(Seg + 1, 0). + +segment_size() -> + ?SEGMENT_ENTRY_COUNT. + +find_lowest_seq_id_seg_and_next_seq_id(State = #qistate { dir = Dir }) -> + SegNums = all_segment_nums(Dir), + %% We don't want the lowest seq_id, merely the seq_id of the start + %% of the lowest segment. That seq_id may not actually exist, but + %% that's fine. The important thing is that the segment exists and + %% the seq_id reported is on a segment boundary. + + %% We also don't really care about the max seq_id. Just start the + %% next segment: it makes life much easier. + + %% SegNums is sorted, ascending. + {LowSeqIdSeg, NextSeqId} = + case SegNums of + [] -> {0, 0}; + [MinSeg|_] -> {reconstruct_seq_id(MinSeg, 0), + reconstruct_seq_id(lists:last(SegNums), 0)} + end, + {LowSeqIdSeg, NextSeqId, State}. + %%---------------------------------------------------------------------------- %% Minors %%---------------------------------------------------------------------------- +all_segment_nums(Dir) -> + lists:sort( + [list_to_integer( + lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)) + || SegName <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)]). + +blank_state(QueueName) -> + StrName = queue_name_to_dir_name(QueueName), + Dir = filename:join(queues_dir(), StrName), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), + #qistate { dir = Dir, + segments = dict:new(), + journal_handle = undefined, + dirty_count = 0 + }. + rev_sort(List) -> lists:sort(fun (A, B) -> B < A end, List). seq_id_to_seg_and_rel_seq_id(SeqId) -> { SeqId div ?SEGMENT_ENTRY_COUNT, SeqId rem ?SEGMENT_ENTRY_COUNT }. -reconstruct_seq_id(SegNum, RelSeq) -> - (SegNum * ?SEGMENT_ENTRY_COUNT) + RelSeq. +reconstruct_seq_id(Seg, RelSeq) -> + (Seg * ?SEGMENT_ENTRY_COUNT) + RelSeq. -seg_num_to_path(Dir, SegNum) -> - SegName = integer_to_list(SegNum), +seg_num_to_path(Dir, Seg) -> + SegName = integer_to_list(Seg), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). delete_segment(#segment { handle = undefined }) -> @@ -206,13 +345,42 @@ get_journal_handle(State = get_journal_handle(State = #qistate { journal_handle = Hdl }) -> {Hdl, State}. +bool_to_int(true ) -> 1; +bool_to_int(false) -> 0. + +write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> + ok = case Publish of + no_pub -> + ok; + {MsgId, IsPersistent} -> + file_handle_cache:append( + Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, + (bool_to_int(IsPersistent)):1, + RelSeq:?REL_SEQ_BITS>>, MsgId]) + end, + ok = case {Del, Ack} of + {no_del, no_ack} -> ok; + _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, + RelSeq:?REL_SEQ_BITS>>, + Data = case {Del, Ack} of + {del, ack} -> [Binary, Binary]; + _ -> Binary + end, + file_handle_cache:append(Hdl, Data) + end, + Hdl. + %%---------------------------------------------------------------------------- %% Majors %%---------------------------------------------------------------------------- %% Loading segments -%% Does not do any combining with the journal at all +%% Does not do any combining with the journal at all. The PubCount +%% that comes back is the number of publishes in the segment. The +%% number of unacked msgs is PubCount - AckCount. If KeepAcks is +%% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks +%% is true, then dict:size(SegDict) == PubCount. load_segment(Seg, KeepAcks, State) -> Segment = #segment { path = Path, handle = SegHdl } = find_segment(Seg, State), @@ -328,7 +496,7 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> _ErrOrEoF -> State end. -add_to_journal(SeqId, Action, State = #qistate {}) -> +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), Segment = #segment { journal_entries = SegJDict, pubs = PubCount, acks = AckCount } = @@ -341,7 +509,7 @@ add_to_journal(SeqId, Action, State = #qistate {}) -> ack -> Segment1 #segment { acks = AckCount + 1 }; {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } end, - store_segment(Segment2, State); + store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); %% This is a more relaxed version of deliver_or_ack_msg because we can %% have dels or acks in the journal without the corresponding |
