diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-21 16:42:11 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-24 19:41:15 +0200 |
| commit | 38324567449b190d0c5c2d76c76daf291de38c6d (patch) | |
| tree | 000e1756f06790eb062d967a5de07acd8f4b045e | |
| parent | b72d210ff7b7c4c82708e6882694199108f55364 (diff) | |
| download | rabbitmq-server-git-38324567449b190d0c5c2d76c76daf291de38c6d.tar.gz | |
Converts journal entries in place.
Instead of folding over the whole journal when we flush it to disk, it
might be better to convert the journal entries to their final disk
format, to avoid folding over 65k journal entries. Of course this
approach uses more memory.
Fixes #227
| -rw-r--r-- | src/rabbit_queue_index.erl | 135 |
1 files changed, 87 insertions, 48 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 0c7d7c230a..24fc629433 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -178,7 +178,8 @@ max_journal_entries, on_sync, on_sync_msg, unconfirmed, unconfirmed_msg}). --record(segment, {num, path, journal_entries, unacked}). +-record(segment, {num, path, journal_entries, + entries_to_segment, unacked}). -include("rabbit.hrl"). @@ -193,10 +194,11 @@ -type(hdl() :: ('undefined' | any())). -type(segment() :: ('undefined' | - #segment { num :: non_neg_integer(), - path :: file:filename(), - journal_entries :: array:array(), - unacked :: non_neg_integer() + #segment { num :: non_neg_integer(), + path :: file:filename(), + journal_entries :: array:array(), + entries_to_segment :: array:array(), + unacked :: non_neg_integer() })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dict(), [segment()]}). @@ -649,30 +651,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount, add_to_journal(RelSeq, Action, Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCount }) -> + + {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries), + + {JEntries1, EToSeg1} = + case Fun of + set -> + {array:set(RelSeq, Entry, JEntries), + array:set(RelSeq, entry_to_segment(RelSeq, Entry, []), + EToSeg)}; + reset -> + {array:reset(RelSeq, JEntries), + array:reset(RelSeq, EToSeg)} + end, + Segment #segment { - journal_entries = add_to_journal(RelSeq, Action, JEntries), + journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = UnackedCount + case Action of ?PUB -> +1; del -> 0; ack -> -1 - end}; + end}. -add_to_journal(RelSeq, Action, JEntries) -> +action_to_entry(RelSeq, Action, JEntries) -> case array:get(RelSeq, JEntries) of undefined -> - array:set(RelSeq, - case Action of - ?PUB -> {Action, no_del, no_ack}; - del -> {no_pub, del, no_ack}; - ack -> {no_pub, no_del, ack} - end, JEntries); + {set, + case Action of + ?PUB -> {Action, no_del, no_ack}; + del -> {no_pub, del, no_ack}; + ack -> {no_pub, no_del, ack} + end}; ({Pub, no_del, no_ack}) when Action == del -> - array:set(RelSeq, {Pub, del, no_ack}, JEntries); + {set, {Pub, del, no_ack}}; ({no_pub, del, no_ack}) when Action == ack -> - array:set(RelSeq, {no_pub, del, ack}, JEntries); + {set, {no_pub, del, ack}}; ({?PUB, del, no_ack}) when Action == ack -> - array:reset(RelSeq, JEntries) + {reset, none} end. maybe_flush_journal(State) -> @@ -703,18 +721,23 @@ flush_journal(State = #qistate { segments = Segments }) -> notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, + entries_to_segment = EToSeg, path = Path } = Segment) -> case array:sparse_size(JEntries) of 0 -> Segment; - _ -> Seg = array:sparse_foldr( - fun entry_to_segment/3, [], JEntries), - file_handle_cache_stats:update(queue_index_write), - - {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, - [{write_buffer, infinity}]), - file_handle_cache:append(Hdl, Seg), - ok = file_handle_cache:close(Hdl), - Segment #segment { journal_entries = array_new() } + _ -> + file_handle_cache_stats:update(queue_index_write), + + {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE, + [{write_buffer, infinity}]), + %% the file_handle_cache also does a list reverse, so this + %% might not be required here, but before we were doing a + %% sparse_foldr, a lists:reverse/1 seems to be the correct + %% thing to do for now. + file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))), + ok = file_handle_cache:close(Hdl), + Segment #segment { journal_entries = array_new(), + entries_to_segment = array_new([]) } end. get_journal_handle(State = #qistate { journal_handle = undefined, @@ -747,14 +770,16 @@ recover_journal(State) -> Segments1 = segment_map( fun (Segment = #segment { journal_entries = JEntries, + entries_to_segment = EToSeg, unacked = UnackedCountInJournal }) -> %% We want to keep ack'd entries in so that we can %% remove them if duplicates are in the journal. The %% counts here are purely from the segment itself. {SegEntries, UnackedCountInSeg} = load_segment(true, Segment), - {JEntries1, UnackedCountDuplicates} = - journal_minus_segment(JEntries, SegEntries), + {JEntries1, EToSeg1, UnackedCountDuplicates} = + journal_minus_segment(JEntries, EToSeg, SegEntries), Segment #segment { journal_entries = JEntries1, + entries_to_segment = EToSeg1, unacked = (UnackedCountInJournal + UnackedCountInSeg - UnackedCountDuplicates) } @@ -841,10 +866,11 @@ segment_find_or_new(Seg, Dir, Segments) -> {ok, Segment} -> Segment; error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION, Path = filename:join(Dir, SegName), - #segment { num = Seg, - path = Path, - journal_entries = array_new(), - unacked = 0 } + #segment { num = Seg, + path = Path, + journal_entries = array_new(), + entries_to_segment = array_new([]), + unacked = 0 } end. segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) -> @@ -884,20 +910,20 @@ segment_nums({Segments, CachedSegments}) -> segments_new() -> {dict:new(), []}. -entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) -> - Buf; -entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) -> +entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) -> + Initial; +entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) -> %% NB: we are assembling the segment in reverse order here, so %% del/ack comes first. Buf1 = case {Del, Ack} of {no_del, no_ack} -> - Buf; + Initial; _ -> Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, RelSeq:?REL_SEQ_BITS>>, case {Del, Ack} of - {del, ack} -> [[Binary, Binary] | Buf]; - _ -> [Binary | Buf] + {del, ack} -> [[Binary, Binary] | Initial]; + _ -> [Binary | Initial] end end, case Pub of @@ -986,7 +1012,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) -> end. array_new() -> - array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). + array_new(undefined). + +array_new(Default) -> + array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]). bool_to_int(true ) -> 1; bool_to_int(false) -> 0. @@ -1032,19 +1061,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) -> %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up %% to clean up the journal. -journal_minus_segment(JEntries, SegEntries) -> +%% +%% We need to update the entries_to_segment since they are just a +%% cache of what's on the journal. +journal_minus_segment(JEntries, EToSeg, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) -> + fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) -> SegEntry = array:get(RelSeq, SegEntries), {Obj, UnackedRemovedDelta} = journal_minus_segment1(JObj, SegEntry), - {case Obj of - keep -> JEntriesOut; - undefined -> array:reset(RelSeq, JEntriesOut); - _ -> array:set(RelSeq, Obj, JEntriesOut) - end, - UnackedRemoved + UnackedRemovedDelta} - end, {JEntries, 0}, JEntries). + {JEntriesOut1, EToSegOut1} = + case Obj of + keep -> + {JEntriesOut, EToSegOut}; + undefined -> + {array:reset(RelSeq, JEntriesOut), + array:reset(RelSeq, EToSegOut)}; + _ -> + {array:set(RelSeq, Obj, JEntriesOut), + array:set(RelSeq, entry_to_segment(RelSeq, Obj, []), + EToSegOut)} + end, + {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta} + end, {JEntries, EToSeg, 0}, JEntries). %% Here, the result is a tuple with the first element containing the %% item we are adding to or modifying in the (initially fresh) journal |
