diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-08-28 15:26:55 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-08-28 15:26:55 +0300 |
| commit | 1085e243a6df514f675b452aa96ffa23b6632ac3 (patch) | |
| tree | 1808bbf73ddc9b829deedd3c947322d224fe6a09 | |
| parent | 06807a4ee5c4989f36d1577164a22652e71bd628 (diff) | |
| parent | 38324567449b190d0c5c2d76c76daf291de38c6d (diff) | |
| download | rabbitmq-server-git-1085e243a6df514f675b452aa96ffa23b6632ac3.tar.gz | |
Merge pull request #283 from rabbitmq/rabbitmq-server-227
Converts journal entries in place.
| -rw-r--r-- | src/rabbit_queue_index.erl | 135 |
1 files changed, 87 insertions, 48 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9bd917ee96..adec2cf2e5 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -179,7 +179,8 @@ max_journal_entries, on_sync, on_sync_msg, unconfirmed, unconfirmed_msg}). --record(segment, {num, path, journal_entries, unacked}). +-record(segment, {num, path, journal_entries, + entries_to_segment, unacked}). -include("rabbit.hrl"). @@ -194,10 +195,11 @@ -type(hdl() :: ('undefined' | any())). -type(segment() :: ('undefined' | - #segment { num :: non_neg_integer(), - path :: file:filename(), - journal_entries :: array:array(), - unacked :: non_neg_integer() + #segment { num :: non_neg_integer(), + path :: file:filename(), + journal_entries :: array:array(), + entries_to_segment :: array:array(), + unacked :: non_neg_integer() })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dict(), [segment()]}). @@ -650,30 +652,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCount }) -> + + {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), + + {JEntries1, EToSeg1} = + case Fun of + set -> + {array:set(RelSeq, Entry, JEntries), + array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), + EToSeg)}; + reset -> + {array:reset(RelSeq, JEntries), + array:reset(RelSeq, EToSeg)} + end, + Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries), + journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = UnackedCount + case Action of ?PUB -> +1; del -> 0; ack -> -1 - end}; + end}. -add_to_journal(RelSeq, Action, JEntries) -> +action_to_entry(RelSeq, Action, JEntries) -> case array:get(RelSeq, JEntries) of undefined -> - array:set(RelSeq, - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end, JEntries); + {set, + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end}; ({Pub, no_del, no_ack}) when Action == del -> - array:set(RelSeq, {Pub, del, no_ack}, JEntries); + {set, {Pub, del, no_ack}}; ({no_pub, del, no_ack}) when Action == ack -> - array:set(RelSeq, {no_pub, del, ack}, JEntries); + {set, {no_pub, del, ack}}; ({?PUB, del, no_ack}) when Action == ack -> - array:reset(RelSeq, JEntries) + {reset, none} end. maybe_flush_journal(State) -> @@ -704,18 +722,23 @@ flush_journal(State = #qistate { segments = Segments }) -> notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, + entries_to_segment = EToSeg, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> Seg = array:sparse_foldr( - fun entry_to_segment/3, [], JEntries), - file_handle_cache_stats:update(queue_index_write), - - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, - [{write_buffer, infinity}]), - file_handle_cache:append(Hdl, Seg), - ok = file_handle_cache:close(Hdl), - Segment #segment { journal_entries = array_new() } + _ -> + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + [{write_buffer, infinity}]), + %% the file_handle_cache also does a list reverse, so this + %% might not be required here, but before we were doing a + %% sparse_foldr, a lists:reverse/1 seems to be the correct + %% thing to do for now. + file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), + ok = file_handle_cache:close(Hdl), + Segment #segment { journal_entries = array_new(), + entries_to_segment = array_new([]) } end. get_journal_handle(State = #qistate { journal_handle = undefined, @@ -748,14 +771,16 @@ recover_journal(State) -> Segments1 = segment_map( fun (Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCountInJournal }) -> %% We want to keep ack'd entries in so that we can %% remove them if duplicates are in the journal. The %% counts here are purely from the segment itself. {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), - {JEntries1, UnackedCountDuplicates} = - journal_minus_segment(JEntries, SegEntries), + {JEntries1, EToSeg1, UnackedCountDuplicates} = + journal_minus_segment(JEntries, EToSeg, SegEntries), Segment #segment { journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = (UnackedCountInJournal + UnackedCountInSeg - UnackedCountDuplicates) } @@ -842,10 +867,11 @@ segment_find_or_new(Seg, Dir, Segments) -> {ok, Segment} -> Segment; error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), - #segment { num = Seg, - path = Path, - journal_entries = array_new(), - unacked = 0 } + #segment { num = Seg, + path = Path, + journal_entries = array_new(), + entries_to_segment = array_new([]), + unacked = 0 } end. segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> @@ -885,20 +911,20 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> - Buf; -entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> +entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> + Initial; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> %% NB: we are assembling the segment in reverse order here, so %% del/ack comes first. Buf1 = case {Del, Ack} of {no_del, no_ack} -> - Buf; + Initial; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, case {Del, Ack} of - {del, ack} -> [[Binary, Binary] | Buf]; - _ -> [Binary | Buf] + {del, ack} -> [[Binary, Binary] | Initial]; + _ -> [Binary | Initial] end end, case Pub of @@ -987,7 +1013,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> end. array_new() -> - array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). + array_new(undefined). + +array_new(Default) -> + array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). bool_to_int(true ) -> 1; bool_to_int(false) -> 0. @@ -1033,19 +1062,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. -journal_minus_segment(JEntries, SegEntries) -> +%% +%% We need to update the entries_to_segment since they are just a +%% cache of what's on the journal. +journal_minus_segment(JEntries, EToSeg, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> + fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> SegEntry = array:get(RelSeq, SegEntries), {Obj, UnackedRemovedDelta} = journal_minus_segment1(JObj, SegEntry), - {case Obj of - keep -> JEntriesOut; - undefined -> array:reset(RelSeq, JEntriesOut); - _ -> array:set(RelSeq, Obj, JEntriesOut) - end, - UnackedRemoved + UnackedRemovedDelta} - end, {JEntries, 0}, JEntries). + {JEntriesOut1, EToSegOut1} = + case Obj of + keep -> + {JEntriesOut, EToSegOut}; + undefined -> + {array:reset(RelSeq, JEntriesOut), + array:reset(RelSeq, EToSegOut)}; + _ -> + {array:set(RelSeq, Obj, JEntriesOut), + array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), + EToSegOut)} + end, + {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} + end, {JEntries, EToSeg, 0}, JEntries). %% Here, the result is a tuple with the first element containing the %% item we are adding to or modifying in the (initially fresh) journal |
