diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-12-03 15:41:16 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-12-03 15:41:16 +0000 |
| commit | f26991818879ac44c62c856d9f01e7d1ff3f07b7 (patch) | |
| tree | dc8a87a49f711aaf3c05537612f6d5f0c86d55f9 /src | |
| parent | 32f53ad4535a5a1055236bcd64c89afde4da996f (diff) | |
| download | rabbitmq-server-git-f26991818879ac44c62c856d9f01e7d1ff3f07b7.tar.gz | |
Abstracted the segment caching in the qi, and associated improvements as not all the state needs to be passed around all the time.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 250 |
1 files changed, 131 insertions, 119 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index adc3f74286..829b03aa18 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -87,9 +87,7 @@ { dir, segments, journal_handle, - dirty_count, - last_seg_a, - last_seg_b + dirty_count }). -record(segment, @@ -108,10 +106,19 @@ -ifdef(use_specs). -type(hdl() :: ('undefined' | any())). +-type(segment() :: ('undefined' | + #segment { pubs :: non_neg_integer(), + acks :: non_neg_integer(), + handle :: hdl(), + journal_entries :: dict(), + path :: file_path(), + num :: non_neg_integer() + })). -type(msg_id() :: binary()). -type(seq_id() :: integer()). +-type(seg_dict() :: {dict(), [segment()], file_path()}). -type(qistate() :: #qistate { dir :: file_path(), - segments :: dict(), + segments :: seg_dict(), journal_handle :: hdl(), dirty_count :: integer() }). @@ -159,11 +166,13 @@ init(Name) -> %% We know the journal is empty here, so we don't need to combine %% with the journal, and we don't need to worry about messages %% that have been acked. - State3 = + State3 = #qistate { segments = Segments } = lists:foldl( - fun (Seg, StateN) -> - {SegDict, _PubCount, _AckCount, StateN1} = - load_segment(Seg, false, StateN), + fun (Seg, StateN = #qistate { segments = SegmentsN }) -> + Segment = segment_find(Seg, SegmentsN), + {SegDict, _PubCount, _AckCount, Segment1} = + load_segment(false, Segment), + SegmentsN1 = segment_store(Segment1, SegmentsN), dict:fold( fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, StateM) -> @@ -183,14 +192,14 @@ init(Name) -> SeqId, ack, add_to_journal(SeqId, del, StateM)) end - end, StateN1, SegDict) + end, StateN #qistate { segments = SegmentsN1 }, SegDict) end, State2, AllSegs), %% 4. Go through all segments and calculate the number of unacked %% messages we have. Count = lists:foldl( fun (Seg, CountAcc) -> #segment { pubs = PubCount, acks = AckCount } = - find_segment(Seg, State3), + segment_find(Seg, Segments), CountAcc + PubCount - AckCount end, 0, AllSegs), {Count, State3}. @@ -240,40 +249,40 @@ sync_seq_ids(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> flush_journal(State = #qistate { dirty_count = 0 }) -> State; -flush_journal(State) -> - State1 = #qistate { segments = Segments } = get_all_segments(State), - State2 = - dict:fold( +flush_journal(State = #qistate { segments = Segments }) -> + Segments1 = + segment_fold( fun (_Seg, #segment { journal_entries = JEntries, pubs = PubCount, - acks = AckCount } = Segment, StateN) -> + acks = AckCount } = Segment, SegmentsN) -> case PubCount > 0 andalso PubCount == AckCount of true -> - ok = delete_segment(Segment), - StateN; + segment_erase(delete_segment(Segment), SegmentsN); false -> case 0 == dict:size(JEntries) of true -> - store_segment(Segment, StateN); + SegmentsN; false -> {Hdl, Segment1} = get_segment_handle(Segment), dict:fold(fun write_entry_to_segment/3, Hdl, JEntries), ok = file_handle_cache:sync(Hdl), - store_segment( + segment_store( Segment1 #segment { journal_entries = - dict:new() }, StateN) + dict:new() }, SegmentsN) end end - end, State1 #qistate { segments = dict:new() }, Segments), - {JournalHdl, State3} = get_journal_handle(State2), + end, Segments, Segments), + {JournalHdl, State1} = + get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State3 #qistate { dirty_count = 0 }. + State1 #qistate { dirty_count = 0 }. -read_segment_entries(InitSeqId, State) -> +read_segment_entries(InitSeqId, State = #qistate { segments = Segments }) -> {Seg, 0} = seq_id_to_seg_and_rel_seq_id(InitSeqId), - {SegDict, _PubCount, _AckCount, State1} = - load_segment(Seg, false, State), - #segment { journal_entries = JEntries } = find_segment(Seg, State1), + Segment = segment_find(Seg, Segments), + {SegDict, _PubCount, _AckCount, + Segment1 = #segment { journal_entries = JEntries }} = + load_segment(false, Segment), SegDict1 = journal_plus_segment(JEntries, SegDict), %% deliberately sort the list desc, because foldl will reverse it RelSeqs = rev_sort(dict:fetch_keys(SegDict1)), @@ -283,7 +292,7 @@ read_segment_entries(InitSeqId, State) -> [ {MsgId, reconstruct_seq_id(Seg, RelSeq), IsPersistent, IsDelivered == del} | Acc ] end, [], RelSeqs), - State1}. + State #qistate { segments = segment_store(Segment1, Segments) }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -385,8 +394,7 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount }) maybe_flush_journal(State) -> State. -all_segment_nums(State = #qistate { dir = Dir }) -> - #qistate { segments = Segments } = get_all_segments(State), +all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> sets:to_list( lists:foldl( fun (SegName, Set) -> @@ -394,7 +402,7 @@ all_segment_nums(State = #qistate { dir = Dir }) -> list_to_integer( lists:takewhile(fun(C) -> $0 =< C andalso C =< $9 end, SegName)), Set) - end, sets:from_list(dict:fetch_keys(Segments)), + end, sets:from_list(segment_fetch_keys(Segments)), filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir))). blank_state(QueueName) -> @@ -402,11 +410,9 @@ blank_state(QueueName) -> Dir = filename:join(queues_dir(), StrName), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), #qistate { dir = Dir, - segments = dict:new(), + segments = segment_new(Dir), journal_handle = undefined, - dirty_count = 0, - last_seg_a = undefined, - last_seg_b = undefined + dirty_count = 0 }. rev_sort(List) -> @@ -422,11 +428,11 @@ seg_num_to_path(Dir, Seg) -> SegName = integer_to_list(Seg), filename:join(Dir, SegName ++ ?SEGMENT_EXTENSION). -delete_segment(#segment { handle = undefined }) -> - ok; -delete_segment(#segment { handle = Hdl }) -> +delete_segment(Segment = #segment { handle = undefined }) -> + Segment; +delete_segment(Segment = #segment { handle = Hdl }) -> ok = file_handle_cache:delete(Hdl), - ok. + Segment #segment { handle = undefined }. detect_clean_shutdown(Dir) -> case file:delete(filename:join(Dir, ?CLEAN_FILENAME)) of @@ -465,11 +471,11 @@ get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> get_segment_handle(Segment = #segment { handle = Hdl }) -> {Hdl, Segment}. -find_segment(Seg, #qistate { last_seg_a = #segment { num = Seg } = Segment }) -> - Segment; -find_segment(Seg, #qistate { last_seg_b = #segment { num = Seg } = Segment }) -> - Segment; -find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> +segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_], _Dir}) -> + Segment; %% 1 or (2, matches head) +segment_find(Seg, {_Segments, [_, Segment = #segment { num = Seg }], _Dir}) -> + Segment; %% 2, matches tail +segment_find(Seg, {Segments, _, Dir}) -> %% no match case dict:find(Seg, Segments) of {ok, Segment = #segment{}} -> Segment; error -> #segment { pubs = 0, @@ -481,46 +487,52 @@ find_segment(Seg, #qistate { segments = Segments, dir = Dir }) -> } end. -store_segment(Segment = #segment { num = Seg }, State = - #qistate { last_seg_a = #segment { num = Seg }}) -> - State #qistate { last_seg_a = Segment }; -store_segment(Segment = #segment { num = Seg }, State = - #qistate { last_seg_b = #segment { num = Seg }}) -> - State #qistate { last_seg_b = Segment }; -store_segment(Segment, State = - #qistate { last_seg_a = LastSegA, last_seg_b = LastSegB }) -> - case LastSegA of - undefined -> - State #qistate { last_seg_a = Segment }; - _ -> - case LastSegB of - undefined -> - State #qistate { last_seg_b = Segment }; - _ -> - State1 = #qistate { segments = Segments } = - State #qistate { last_seg_a = LastSegB, - last_seg_b = Segment }, - State1 #qistate { - segments = return_segment_to_dict(LastSegA, Segments) } - end - end. - -get_all_segments(State = #qistate { last_seg_a = undefined, - last_seg_b = undefined }) -> - State; -get_all_segments(State = #qistate { segments = Segments, - last_seg_a = LastSegA, - last_seg_b = LastSegB }) -> - State #qistate { last_seg_a = undefined, - last_seg_b = undefined, - segments = return_segment_to_dict( - LastSegB, - return_segment_to_dict(LastSegA, Segments)) }. - -return_segment_to_dict(undefined, Segments) -> - Segments; -return_segment_to_dict(Segment = #segment { num = Seg }, Segments) -> - dict:store(Seg, Segment, Segments). +segment_store(Segment = #segment { num = Seg }, %% 1 or (2, matches head) + {Segments, [#segment { num = Seg } | Tail], Dir}) -> + {Segments, [Segment | Tail], Dir}; +segment_store(Segment = #segment { num = Seg }, %% 2, matches tail + {Segments, [SegmentA, #segment { num = Seg }], Dir}) -> + {Segments, [SegmentA, Segment], Dir}; +segment_store(Segment = #segment { num = Seg }, + {Segments, [], Dir}) -> + {dict:erase(Seg, Segments), [Segment], Dir}; +segment_store(Segment = #segment { num = Seg }, + {Segments, [SegmentA], Dir}) -> + {dict:erase(Seg, Segments), [Segment, SegmentA], Dir}; +segment_store(Segment = #segment { num = Seg }, + {Segments, [SegmentA, SegmentB], Dir}) -> + {dict:store(SegmentB#segment.num, SegmentB, dict:erase(Seg, Segments)), + [Segment, SegmentA], Dir}. + +segment_fold(Fun, Acc, {Segments, [], _Dir}) -> + dict:fold(Fun, Acc, Segments); +segment_fold(Fun, Acc, {Segments, CachedSegments, _Dir}) -> + Acc1 = lists:foldl(fun (Segment = #segment { num = Num }, AccN) -> + Fun(Num, Segment, AccN) + end, Acc, CachedSegments), + dict:fold(Fun, Acc1, Segments). + +segment_map(Fun, {Segments, CachedSegments, Dir}) -> + {dict:map(Fun, Segments), + lists:map(fun (Segment = #segment { num = Num }) -> Fun(Num, Segment) end, + CachedSegments), Dir}. + +segment_fetch_keys({Segments, CachedSegments, _Dir}) -> + lists:map(fun (Segment) -> Segment#segment.num end, CachedSegments) ++ + dict:fetch_keys(Segments). + +segment_erase(#segment { handle = undefined, num = Num }, + {Segments, [#segment { num = Num } | Rest], Dir}) -> + {Segments, Rest, Dir}; %% 1 or (2, matches head) +segment_erase(#segment { handle = undefined, num = Num }, + {Segments, [Head, #segment { num = Num }], Dir}) -> + {Segments, [Head], Dir}; %% 2, matches tail +segment_erase(#segment { handle = undefined, num = Num }, + {Segments, CachedSegments, Dir}) -> + {dict:erase(Num, Segments), CachedSegments, Dir}. + +segment_new(Dir) -> + {dict:new(), [], Dir}. get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> @@ -562,13 +574,12 @@ write_entry_to_segment(RelSeq, {Publish, Del, Ack}, Hdl) -> terminate(StoreShutdown, State = #qistate { journal_handle = JournalHdl, - dir = Dir }) -> - State1 = #qistate { segments = Segments } = get_all_segments(State), + dir = Dir, segments = Segments }) -> ok = case JournalHdl of undefined -> ok; _ -> file_handle_cache:close(JournalHdl) end, - ok = dict:fold( + ok = segment_fold( fun (_Seg, #segment { handle = undefined }, ok) -> ok; (_Seg, #segment { handle = Hdl }, ok) -> @@ -578,7 +589,7 @@ terminate(StoreShutdown, State = true -> store_clean_shutdown(Dir); false -> ok end, - State1 #qistate { journal_handle = undefined, segments = dict:new() }. + State #qistate { journal_handle = undefined, segments = segment_new(Dir) }. %%---------------------------------------------------------------------------- %% Majors @@ -591,22 +602,21 @@ terminate(StoreShutdown, State = %% number of unacked msgs is PubCount - AckCount. If KeepAcks is %% false, then dict:size(SegDict) == PubCount - AckCount. If KeepAcks %% is true, then dict:size(SegDict) == PubCount. -load_segment(Seg, KeepAcks, State) -> - Segment = #segment { path = Path, handle = SegHdl } = - find_segment(Seg, State), +load_segment(KeepAcks, + Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of undefined -> filelib:is_file(Path); _ -> true end, case SegmentExists of false -> - {dict:new(), 0, 0, State}; + {dict:new(), 0, 0, Segment}; true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), {SegDict, PubCount, AckCount} = load_segment_entries(KeepAcks, Hdl, dict:new(), 0, 0), - {SegDict, PubCount, AckCount, store_segment(Segment1, State)} + {SegDict, PubCount, AckCount, Segment1} end. load_segment_entries(KeepAcks, Hdl, SegDict, PubCount, AckCount) -> @@ -653,29 +663,29 @@ deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegDict) -> load_journal(State) -> {JournalHdl, State1} = get_journal_handle(State), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State2 = #qistate { segments = Segments } = - get_all_segments(load_journal_entries(State1)), - dict:fold( - fun (Seg, #segment { journal_entries = JEntries, - pubs = PubCountInJournal, - acks = AckCountInJournal }, StateN) -> - %% We want to keep acks in so that we can remove them if - %% duplicates are in the journal. The counts here are - %% purely from the segment itself. - {SegDict, PubCountInSeg, AckCountInSeg, StateN1} = - load_segment(Seg, true, StateN), - %% Removed counts here are the number of pubs and acks - %% that are duplicates - i.e. found in both the segment - %% and journal. - {JEntries1, PubsRemoved, AcksRemoved} = - journal_minus_segment(JEntries, SegDict), - Segment1 = find_segment(Seg, StateN1), - PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, - AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, - store_segment(Segment1 #segment { journal_entries = JEntries1, - pubs = PubCount1, - acks = AckCount1 }, StateN1) - end, State2, Segments). + State2 = #qistate { segments = Segments } = load_journal_entries(State1), + Segments1 = + segment_map( + fun (_Seg, Segment = #segment { journal_entries = JEntries, + pubs = PubCountInJournal, + acks = AckCountInJournal }) -> + %% We want to keep acks in so that we can remove + %% them if duplicates are in the journal. The counts + %% here are purely from the segment itself. + {SegDict, PubCountInSeg, AckCountInSeg, Segment1} = + load_segment(true, Segment), + %% Removed counts here are the number of pubs and + %% acks that are duplicates - i.e. found in both the + %% segment and journal. + {JEntries1, PubsRemoved, AcksRemoved} = + journal_minus_segment(JEntries, SegDict), + PubCount1 = PubCountInSeg + PubCountInJournal - PubsRemoved, + AckCount1 = AckCountInSeg + AckCountInJournal - AcksRemoved, + Segment1 #segment { journal_entries = JEntries1, + pubs = PubCount1, + acks = AckCount1 } + end, Segments), + State2 #qistate { segments = Segments1 }. load_journal_entries(State = #qistate { journal_handle = Hdl }) -> case file_handle_cache:read(Hdl, ?SEQ_BYTES) of @@ -707,11 +717,12 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> _ErrOrEoF -> State end. -add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> +add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, + segments = Segments }) -> {Seg, RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), Segment = #segment { journal_entries = SegJDict, pubs = PubCount, acks = AckCount } = - find_segment(Seg, State), + segment_find(Seg, Segments), SegJDict1 = add_to_journal(RelSeq, Action, SegJDict), Segment1 = Segment #segment { journal_entries = SegJDict1 }, Segment2 = @@ -720,7 +731,8 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount }) -> ack -> Segment1 #segment { acks = AckCount + 1 }; {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } end, - store_segment(Segment2, State #qistate { dirty_count = DCount + 1 }); + State #qistate { dirty_count = DCount + 1, + segments = segment_store(Segment2, Segments) }; %% This is a more relaxed version of deliver_or_ack_msg because we can %% have dels or acks in the journal without the corresponding |
