summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-18 15:40:24 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-18 15:40:24 +0100
commitfd10acefa0e9ac776bdd13b2da191bb16864dc3a (patch)
tree6fad9cddc7f5dbbe3ceef6f64939d258ecd4558a
parentd95c60baffae89d962f33fa0353e48261e6a229d (diff)
downloadrabbitmq-server-git-fd10acefa0e9ac776bdd13b2da191bb16864dc3a.tar.gz
take the journal into consideration in recover_segment
which removes the need for an initial flush
-rw-r--r--src/rabbit_queue_index.erl95
1 files changed, 51 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 1f3ce0a805..93fd3759cc 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -213,28 +213,21 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
{error, _} -> [];
{ok, Terms1} -> Terms1
end,
- %% 1. Load the journal completely. This will also load segments
- %% which have entries in the journal and remove duplicates.
- %% The counts will correctly reflect the combination of the
- %% segment and the journal.
- State1 = load_journal(State),
- %% 2. Flush the journal. This makes life easier for everyone, as
- %% it means there won't be any publishes in the journal
- %% alone. The dirty recovery code below relies on this.
- State2 = #qistate { dir = Dir, segments = Segments } =
- flush_journal(State1),
- %% 3. Load each segment in turn and filter out messages that are
- %% not in the msg_store, by adding acks to the journal. These
- %% acks only go to the RAM journal as it doesn't matter if we
- %% lose them. Also mark delivered if not clean shutdown. Also
- %% find the number of unacked messages.
+ %% Load the journal completely. This will also load segments which
+ %% have entries in the journal and remove duplicates. The counts
+ %% will correctly reflect the combination of the segment and the
+ %% journal.
+ State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
CleanShutdown = detect_clean_shutdown(Dir),
- %% We know the journal is empty here, so we don't need to combine
- %% with the journal, and we don't need to worry about messages
- %% that have been acked.
{Segments1, Count} =
case CleanShutdown andalso MsgStoreRecovered of
false ->
+ %% Load each segment in turn and filter out messages
+ %% that are not in the msg_store, by adding acks to
+ %% the journal. These acks only go to the RAM journal
+ %% as it doesn't matter if we lose them. Also mark
+ %% delivered if not clean shutdown. Also find the
+ %% number of unacked messages.
lists:foldl(
fun (Seg, {Segments2, CountAcc}) ->
Segment = #segment { pubs = PubCount,
@@ -244,7 +237,7 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
segment_find_or_new(Seg, Dir, Segments2)),
{segment_store(Segment, Segments2),
CountAcc + PubCount - AckCount}
- end, {Segments, 0}, all_segment_nums(State2));
+ end, {Segments, 0}, all_segment_nums(State1));
true ->
%% At this stage, we will only know about files that
%% were loaded during journal loading, They *will* have
@@ -266,19 +259,23 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) ->
_ ->
SegmentsN
end
- end, Segments, all_segment_nums(State2)),
+ end, Segments, all_segment_nums(State1)),
%% the counts above include transient messages, which
%% would be the wrong thing to return
undefined}
end,
- %% flush again so we eagerly remove any segments that have become
- %% empty due to either ContainsCheckFun returning false in the
- %% non-clean recovery case or PubCount==AckCount in the clean
- %% recovery case. Since the latter doesn't go through the journal
- %% logic we we artificially set the dirty_count non zero.
- State3 = flush_journal(State2 #qistate { segments = Segments1,
+ %% Flush so we eagerly remove any segments that have become empty
+ %% due to
+ %% a) processing the journal,
+ %% b) ContainsCheckFun returning false in the non-clean
+ %% recovery case, or
+ %% c) recovering a segment with PubCount==AckCount in the clean
+ %% recovery case
+ %% Since the latter doesn't go through the journal logic we we
+ %% artificially set the dirty_count non zero.
+ State2 = flush_journal(State1 #qistate { segments = Segments1,
dirty_count = 1 }),
- {Count, Terms, State3}.
+ {Count, Terms, State2}.
terminate(Terms, State) ->
terminate(true, Terms, State).
@@ -352,6 +349,8 @@ read(Start, End, State = #qistate { segments = Segments,
Segment = segment_find_or_new(StartSeg, Dir, Segments),
{SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment),
#segment { journal_entries = JEntries } = Segment1,
+ {SegEntries1, _PubCountDelta, _AckCountDelta} =
+ journal_plus_segment(JEntries, SegEntries),
{array:sparse_foldr(
fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq ->
@@ -359,7 +358,7 @@ read(Start, End, State = #qistate { segments = Segments,
IsPersistent, IsDelivered == del} | Acc ];
(_RelSeq, _Value, Acc) ->
Acc
- end, [], journal_plus_segment(JEntries, SegEntries)),
+ end, [], SegEntries1),
Again,
State #qistate { segments = segment_store(Segment1, Segments) }}.
@@ -474,13 +473,17 @@ terminate(StoreShutdown, Terms, State =
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
{SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment),
+ #segment { journal_entries = JEntries } = Segment1,
+ {SegEntries1, PubCountDelta, AckCountDelta} =
+ journal_plus_segment(JEntries, SegEntries),
array:sparse_foldl(
- fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) ->
+ fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
- Del, RelSeq, Segment3)
+ Del, RelSeq, Segment2)
end,
- Segment1 #segment { pubs = PubCount, acks = AckCount },
- SegEntries).
+ Segment1 #segment { pubs = PubCount + PubCountDelta,
+ acks = AckCount + AckCountDelta},
+ SegEntries1).
recover_message( true, true, _Del, _RelSeq, Segment) ->
Segment;
@@ -884,31 +887,35 @@ bool_to_int(false) -> 0.
%% queue.
journal_plus_segment(JEntries, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, SegEntriesOut) ->
+ fun (RelSeq, JObj, {SegEntriesOut, PubsAdded, AcksAdded}) ->
SegEntry = array:get(RelSeq, SegEntriesOut),
- case journal_plus_segment1(JObj, SegEntry) of
- undefined -> array:reset(RelSeq, SegEntriesOut);
- Obj -> array:set(RelSeq, Obj, SegEntriesOut)
- end
- end, SegEntries, JEntries).
+ {Obj, PubsAddedDelta, AcksAddedDelta} =
+ journal_plus_segment1(JObj, SegEntry),
+ {case Obj of
+ undefined -> array:reset(RelSeq, SegEntriesOut);
+ Obj -> array:set(RelSeq, Obj, SegEntriesOut)
+ end,
+ PubsAdded + PubsAddedDelta,
+ AcksAdded + AcksAddedDelta}
+ end, {SegEntries, 0, 0}, JEntries).
%% Here, the result is the item which we may be adding to (for items
%% only in the journal), modifying in (bits in both), or, when
%% returning 'undefined', erasing from (ack in journal, not segment)
%% the segment array.
journal_plus_segment1({?PUB, no_del, no_ack} = Obj, undefined) ->
- Obj;
+ {Obj, 1, 0};
journal_plus_segment1({?PUB, del, no_ack} = Obj, undefined) ->
- Obj;
+ {Obj, 1, 0};
journal_plus_segment1({?PUB, del, ack}, undefined) ->
- undefined;
+ {undefined, 1, 1};
journal_plus_segment1({no_pub, del, no_ack}, {?PUB = Pub, no_del, no_ack}) ->
- {Pub, del, no_ack};
+ {{Pub, del, no_ack}, 0, 0};
journal_plus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) ->
- undefined;
+ {undefined, 0, 1};
journal_plus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) ->
- undefined.
+ {undefined, 0, 1}.
%% Remove from the journal entries for a segment, items that are
%% duplicates of entries found in the segment itself. Used on start up