diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 110 |
1 files changed, 43 insertions, 67 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 02d0d8ad41..858ade2629 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -155,11 +155,13 @@ -define(PUB, {_Guid, _IsPersistent}). +-define(READ_MODE, [binary, raw, read, {read_ahead, ?SEGMENT_TOTAL_SIZE}]). + %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count }). --record(segment, { unacked, handle, journal_entries, path, num }). +-record(segment, { unacked, journal_entries, path, num }). -include("rabbit.hrl"). @@ -170,7 +172,6 @@ -type(hdl() :: ('undefined' | any())). -type(segment() :: ('undefined' | #segment { unacked :: non_neg_integer(), - handle :: hdl(), journal_entries :: array(), path :: file_path(), num :: non_neg_integer() @@ -296,9 +297,9 @@ read(Start, End, State = #qistate { segments = Segments, true -> EndRelSeq; false -> ?SEGMENT_ENTRY_COUNT end, - Segment = segment_find_or_new(StartSeg, Dir, Segments), - {SegEntries, _UnackedCount, Segment1} = load_segment(false, Segment), - #segment { journal_entries = JEntries } = Segment1, + Segment = #segment { journal_entries = JEntries } = + segment_find_or_new(StartSeg, Dir, Segments), + {SegEntries, _UnackedCount} = load_segment(false, Segment), {SegEntries1, _UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), {array:sparse_foldr( @@ -310,7 +311,7 @@ read(Start, End, State = #qistate { segments = Segments, Acc end, [], SegEntries1), Again, - State #qistate { segments = segment_store(Segment1, Segments) }}. + State #qistate { segments = segment_store(Segment, Segments) }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -448,28 +449,23 @@ terminate(State = #qistate { journal_handle = JournalHdl, end, SegmentCounts = segment_fold( - fun (Seg, #segment { handle = Hdl, unacked = UnackedCount }, - SegmentCountsAcc) -> - ok = case Hdl of - undefined -> ok; - _ -> file_handle_cache:close(Hdl) - end, + fun (Seg, #segment { unacked = UnackedCount }, SegmentCountsAcc) -> [{Seg, UnackedCount} | SegmentCountsAcc] end, [], Segments), {SegmentCounts, State #qistate { journal_handle = undefined, segments = undefined }}. -recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> - {SegEntries, UnackedCount, Segment1} = load_segment(false, Segment), - #segment { journal_entries = JEntries } = Segment1, +recover_segment(ContainsCheckFun, CleanShutdown, + Segment = #segment { journal_entries = JEntries }) -> + {SegEntries, UnackedCount} = load_segment(false, Segment), {SegEntries1, UnackedCountDelta} = segment_plus_journal(SegEntries, JEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) -> + fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment1) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, - Del, RelSeq, Segment2) + Del, RelSeq, Segment1) end, - Segment1 #segment { unacked = UnackedCount + UnackedCountDelta }, + Segment #segment { unacked = UnackedCount + UnackedCountDelta }, SegEntries1). recover_message( true, true, _Del, _RelSeq, Segment) -> @@ -578,8 +574,11 @@ maybe_flush_journal(State) -> flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( - fun (_Seg, #segment { unacked = 0 } = Segment, SegmentsN) -> - ok = delete_segment(Segment), + fun (_Seg, #segment { unacked = 0, path = Path }, SegmentsN) -> + case filelib:is_file(Path) of + true -> ok = file:delete(Path); + false -> ok + end, SegmentsN; (_Seg, #segment {} = Segment, SegmentsN) -> segment_store(append_journal_to_segment(Segment), SegmentsN) @@ -589,21 +588,21 @@ flush_journal(State = #qistate { segments = Segments }) -> ok = file_handle_cache:clear(JournalHdl), State1 #qistate { dirty_count = 0 }. -append_journal_to_segment(#segment { journal_entries = JEntries } = Segment) -> +append_journal_to_segment(#segment { journal_entries = JEntries, + path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> {Hdl, Segment1} = get_segment_handle(Segment), + _ -> {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], + [{write_buffer, infinity}]), array:sparse_foldl(fun write_entry_to_segment/3, Hdl, JEntries), - ok = file_handle_cache:sync(Hdl), - Segment1 #segment { journal_entries = array_new() } + file_handle_cache:close(Hdl), + Segment #segment { journal_entries = array_new() } end. get_journal_handle(State = #qistate { journal_handle = undefined, dir = Dir }) -> Path = filename:join(Dir, ?JOURNAL_FILENAME), - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], + {ok, Hdl} = file_handle_cache:open(Path, [write | ?READ_MODE], [{write_buffer, infinity}]), {Hdl, State #qistate { journal_handle = Hdl }}; get_journal_handle(State = #qistate { journal_handle = Hdl }) -> @@ -627,14 +626,13 @@ recover_journal(State) -> %% We want to keep ack'd entries in so that we can %% remove them if duplicates are in the journal. The %% counts here are purely from the segment itself. - {SegEntries, UnackedCountInSeg, Segment1} = - load_segment(true, Segment), + {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), {JEntries1, UnackedCountDuplicates} = journal_minus_segment(JEntries, SegEntries), - Segment1 #segment { journal_entries = JEntries1, - unacked = (UnackedCountInJournal + - UnackedCountInSeg - - UnackedCountDuplicates) } + Segment #segment { journal_entries = JEntries1, + unacked = (UnackedCountInJournal + + UnackedCountInSeg - + UnackedCountDuplicates) } end, Segments), State1 #qistate { segments = Segments1 }. @@ -693,31 +691,13 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) -> end, sets:from_list(segment_fetch_keys(Segments)), filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))). -delete_segment(#segment { handle = undefined }) -> - ok; -delete_segment(#segment { handle = Hdl }) -> - ok = file_handle_cache:delete(Hdl). - -get_segment_handle(Segment = #segment { handle = undefined, path = Path }) -> - {ok, Hdl} = file_handle_cache:open(Path, - [binary, raw, read, write, - {read_ahead, ?SEGMENT_TOTAL_SIZE}], - [{write_buffer, infinity}]), - {Hdl, Segment #segment { handle = Hdl }}; -get_segment_handle(Segment = #segment { handle = Hdl }) -> - {Hdl, Segment}. - -segment_new(Seg, Dir) -> - #segment { unacked = 0, - handle = undefined, - journal_entries = array_new(), - path = seg_num_to_path(Dir, Seg), - num = Seg }. - segment_find_or_new(Seg, Dir, Segments) -> case segment_find(Seg, Segments) of - error -> segment_new(Seg, Dir); - {ok, Segment} -> Segment + {ok, Segment} -> Segment; + error -> #segment { unacked = 0, + journal_entries = array_new(), + path = seg_num_to_path(Dir, Seg), + num = Seg } end. segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> @@ -793,18 +773,14 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> %% Does not do any combining with the journal at all. The PubCount %% that comes back is the number of publishes in the segment. The %% number of unacked msgs is PubCount - AckCount. -load_segment(KeepAcked, Segment = #segment { path = Path, handle = SegHdl }) -> - SegmentExists = case SegHdl of - undefined -> filelib:is_file(Path); - _ -> true - end, - case SegmentExists of - false -> {array_new(), 0, Segment}; - true -> {Hdl, Segment1} = get_segment_handle(Segment), +load_segment(KeepAcked, #segment { path = Path }) -> + case filelib:is_file(Path) of + false -> {array_new(), 0}; + true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_MODE, []), {ok, 0} = file_handle_cache:position(Hdl, bof), - {SegEntries, UnackedCount} = - load_segment_entries(KeepAcked, Hdl, array_new(), 0), - {SegEntries, UnackedCount, Segment1} + Res = load_segment_entries(KeepAcked, Hdl, array_new(), 0), + file_handle_cache:close(Hdl), + Res end. load_segment_entries(KeepAcked, Hdl, SegEntries, UnackedCount) -> |
