summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-08-28 15:26:55 +0300
committerMichael Klishin <michael@novemberain.com>2015-08-28 15:26:55 +0300
commit1085e243a6df514f675b452aa96ffa23b6632ac3 (patch)
tree1808bbf73ddc9b829deedd3c947322d224fe6a09
parent06807a4ee5c4989f36d1577164a22652e71bd628 (diff)
parent38324567449b190d0c5c2d76c76daf291de38c6d (diff)
downloadrabbitmq-server-git-1085e243a6df514f675b452aa96ffa23b6632ac3.tar.gz
Merge pull request #283 from rabbitmq/rabbitmq-server-227
Converts journal entries in place.
-rw-r--r--src/rabbit_queue_index.erl135
1 files changed, 87 insertions, 48 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9bd917ee96..adec2cf2e5 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -179,7 +179,8 @@
max_journal_entries, on_sync, on_sync_msg,
unconfirmed, unconfirmed_msg}).
--record(segment, {num, path, journal_entries, unacked}).
+-record(segment, {num, path, journal_entries,
+ entries_to_segment, unacked}).
-include("rabbit.hrl").
@@ -194,10 +195,11 @@
-type(hdl() :: ('undefined' | any())).
-type(segment() :: ('undefined' |
- #segment { num :: non_neg_integer(),
- path :: file:filename(),
- journal_entries :: array:array(),
- unacked :: non_neg_integer()
+ #segment { num :: non_neg_integer(),
+ path :: file:filename(),
+ journal_entries :: array:array(),
+ entries_to_segment :: array:array(),
+ unacked :: non_neg_integer()
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict:dict(), [segment()]}).
@@ -650,30 +652,46 @@ add_to_journal(SeqId, Action, State = #qistate { dirty_count = DCount,
add_to_journal(RelSeq, Action,
Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
unacked = UnackedCount }) ->
+
+ {Fun, Entry} = action_to_entry(RelSeq, Action, JEntries),
+
+ {JEntries1, EToSeg1} =
+ case Fun of
+ set ->
+ {array:set(RelSeq, Entry, JEntries),
+ array:set(RelSeq, entry_to_segment(RelSeq, Entry, []),
+ EToSeg)};
+ reset ->
+ {array:reset(RelSeq, JEntries),
+ array:reset(RelSeq, EToSeg)}
+ end,
+
Segment #segment {
- journal_entries = add_to_journal(RelSeq, Action, JEntries),
+ journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
unacked = UnackedCount + case Action of
?PUB -> +1;
del -> 0;
ack -> -1
- end};
+ end}.
-add_to_journal(RelSeq, Action, JEntries) ->
+action_to_entry(RelSeq, Action, JEntries) ->
case array:get(RelSeq, JEntries) of
undefined ->
- array:set(RelSeq,
- case Action of
- ?PUB -> {Action, no_del, no_ack};
- del -> {no_pub, del, no_ack};
- ack -> {no_pub, no_del, ack}
- end, JEntries);
+ {set,
+ case Action of
+ ?PUB -> {Action, no_del, no_ack};
+ del -> {no_pub, del, no_ack};
+ ack -> {no_pub, no_del, ack}
+ end};
({Pub, no_del, no_ack}) when Action == del ->
- array:set(RelSeq, {Pub, del, no_ack}, JEntries);
+ {set, {Pub, del, no_ack}};
({no_pub, del, no_ack}) when Action == ack ->
- array:set(RelSeq, {no_pub, del, ack}, JEntries);
+ {set, {no_pub, del, ack}};
({?PUB, del, no_ack}) when Action == ack ->
- array:reset(RelSeq, JEntries)
+ {reset, none}
end.
maybe_flush_journal(State) ->
@@ -704,18 +722,23 @@ flush_journal(State = #qistate { segments = Segments }) ->
notify_sync(State1 #qistate { dirty_count = 0 }).
append_journal_to_segment(#segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
path = Path } = Segment) ->
case array:sparse_size(JEntries) of
0 -> Segment;
- _ -> Seg = array:sparse_foldr(
- fun entry_to_segment/3, [], JEntries),
- file_handle_cache_stats:update(queue_index_write),
-
- {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
- [{write_buffer, infinity}]),
- file_handle_cache:append(Hdl, Seg),
- ok = file_handle_cache:close(Hdl),
- Segment #segment { journal_entries = array_new() }
+ _ ->
+ file_handle_cache_stats:update(queue_index_write),
+
+ {ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
+ [{write_buffer, infinity}]),
+ %% the file_handle_cache also does a list reverse, so this
+ %% might not be required here, but before we were doing a
+ %% sparse_foldr, a lists:reverse/1 seems to be the correct
+ %% thing to do for now.
+ file_handle_cache:append(Hdl, lists:reverse(array:to_list(EToSeg))),
+ ok = file_handle_cache:close(Hdl),
+ Segment #segment { journal_entries = array_new(),
+ entries_to_segment = array_new([]) }
end.
get_journal_handle(State = #qistate { journal_handle = undefined,
@@ -748,14 +771,16 @@ recover_journal(State) ->
Segments1 =
segment_map(
fun (Segment = #segment { journal_entries = JEntries,
+ entries_to_segment = EToSeg,
unacked = UnackedCountInJournal }) ->
%% We want to keep ack'd entries in so that we can
%% remove them if duplicates are in the journal. The
%% counts here are purely from the segment itself.
{SegEntries, UnackedCountInSeg} = load_segment(true, Segment),
- {JEntries1, UnackedCountDuplicates} =
- journal_minus_segment(JEntries, SegEntries),
+ {JEntries1, EToSeg1, UnackedCountDuplicates} =
+ journal_minus_segment(JEntries, EToSeg, SegEntries),
Segment #segment { journal_entries = JEntries1,
+ entries_to_segment = EToSeg1,
unacked = (UnackedCountInJournal +
UnackedCountInSeg -
UnackedCountDuplicates) }
@@ -842,10 +867,11 @@ segment_find_or_new(Seg, Dir, Segments) ->
{ok, Segment} -> Segment;
error -> SegName = integer_to_list(Seg) ++ ?SEGMENT_EXTENSION,
Path = filename:join(Dir, SegName),
- #segment { num = Seg,
- path = Path,
- journal_entries = array_new(),
- unacked = 0 }
+ #segment { num = Seg,
+ path = Path,
+ journal_entries = array_new(),
+ entries_to_segment = array_new([]),
+ unacked = 0 }
end.
segment_find(Seg, {_Segments, [Segment = #segment { num = Seg } |_]}) ->
@@ -885,20 +911,20 @@ segment_nums({Segments, CachedSegments}) ->
segments_new() ->
{dict:new(), []}.
-entry_to_segment(_RelSeq, {?PUB, del, ack}, Buf) ->
- Buf;
-entry_to_segment(RelSeq, {Pub, Del, Ack}, Buf) ->
+entry_to_segment(_RelSeq, {?PUB, del, ack}, Initial) ->
+ Initial;
+entry_to_segment(RelSeq, {Pub, Del, Ack}, Initial) ->
%% NB: we are assembling the segment in reverse order here, so
%% del/ack comes first.
Buf1 = case {Del, Ack} of
{no_del, no_ack} ->
- Buf;
+ Initial;
_ ->
Binary = <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
RelSeq:?REL_SEQ_BITS>>,
case {Del, Ack} of
- {del, ack} -> [[Binary, Binary] | Buf];
- _ -> [Binary | Buf]
+ {del, ack} -> [[Binary, Binary] | Initial];
+ _ -> [Binary | Initial]
end
end,
case Pub of
@@ -987,7 +1013,10 @@ add_segment_relseq_entry(KeepAcked, RelSeq, {SegEntries, Unacked}) ->
end.
array_new() ->
- array:new([{default, undefined}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
+ array_new(undefined).
+
+array_new(Default) ->
+ array:new([{default, Default}, fixed, {size, ?SEGMENT_ENTRY_COUNT}]).
bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
@@ -1033,19 +1062,29 @@ segment_plus_journal1({?PUB, del, no_ack}, {no_pub, no_del, ack}) ->
%% Remove from the journal entries for a segment, items that are
%% duplicates of entries found in the segment itself. Used on start up
%% to clean up the journal.
-journal_minus_segment(JEntries, SegEntries) ->
+%%
+%% We need to update the entries_to_segment since they are just a
+%% cache of what's on the journal.
+journal_minus_segment(JEntries, EToSeg, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, {JEntriesOut, UnackedRemoved}) ->
+ fun (RelSeq, JObj, {JEntriesOut, EToSegOut, UnackedRemoved}) ->
SegEntry = array:get(RelSeq, SegEntries),
{Obj, UnackedRemovedDelta} =
journal_minus_segment1(JObj, SegEntry),
- {case Obj of
- keep -> JEntriesOut;
- undefined -> array:reset(RelSeq, JEntriesOut);
- _ -> array:set(RelSeq, Obj, JEntriesOut)
- end,
- UnackedRemoved + UnackedRemovedDelta}
- end, {JEntries, 0}, JEntries).
+ {JEntriesOut1, EToSegOut1} =
+ case Obj of
+ keep ->
+ {JEntriesOut, EToSegOut};
+ undefined ->
+ {array:reset(RelSeq, JEntriesOut),
+ array:reset(RelSeq, EToSegOut)};
+ _ ->
+ {array:set(RelSeq, Obj, JEntriesOut),
+ array:set(RelSeq, entry_to_segment(RelSeq, Obj, []),
+ EToSegOut)}
+ end,
+ {JEntriesOut1, EToSegOut1, UnackedRemoved + UnackedRemovedDelta}
+ end, {JEntries, EToSeg, 0}, JEntries).
%% Here, the result is a tuple with the first element containing the
%% item we are adding to or modifying in the (initially fresh) journal