diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-18 15:40:24 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-18 15:40:24 +0100 |
| commit | fd10acefa0e9ac776bdd13b2da191bb16864dc3a (patch) | |
| tree | 6fad9cddc7f5dbbe3ceef6f64939d258ecd4558a | |
| parent | d95c60baffae89d962f33fa0353e48261e6a229d (diff) | |
| download | rabbitmq-server-git-fd10acefa0e9ac776bdd13b2da191bb16864dc3a.tar.gz | |
take the journal into consideration in recover_segment
which removes the need for an initial flush
| -rw-r--r-- | src/rabbit_queue_index.erl | 95 |
1 files changed, 51 insertions, 44 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 1f3ce0a805..93fd3759cc 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -213,28 +213,21 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> {error, _} -> []; {ok, Terms1} -> Terms1 end, - %% 1. Load the journal completely. This will also load segments - %% which have entries in the journal and remove duplicates. - %% The counts will correctly reflect the combination of the - %% segment and the journal. - State1 = load_journal(State), - %% 2. Flush the journal. This makes life easier for everyone, as - %% it means there won't be any publishes in the journal - %% alone. The dirty recovery code below relies on this. - State2 = #qistate { dir = Dir, segments = Segments } = - flush_journal(State1), - %% 3. Load each segment in turn and filter out messages that are - %% not in the msg_store, by adding acks to the journal. These - %% acks only go to the RAM journal as it doesn't matter if we - %% lose them. Also mark delivered if not clean shutdown. Also - %% find the number of unacked messages. + %% Load the journal completely. This will also load segments which + %% have entries in the journal and remove duplicates. The counts + %% will correctly reflect the combination of the segment and the + %% journal. + State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), CleanShutdown = detect_clean_shutdown(Dir), - %% We know the journal is empty here, so we don't need to combine - %% with the journal, and we don't need to worry about messages - %% that have been acked. {Segments1, Count} = case CleanShutdown andalso MsgStoreRecovered of false -> + %% Load each segment in turn and filter out messages + %% that are not in the msg_store, by adding acks to + %% the journal. These acks only go to the RAM journal + %% as it doesn't matter if we lose them. Also mark + %% delivered if not clean shutdown. Also find the + %% number of unacked messages. lists:foldl( fun (Seg, {Segments2, CountAcc}) -> Segment = #segment { pubs = PubCount, @@ -244,7 +237,7 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> segment_find_or_new(Seg, Dir, Segments2)), {segment_store(Segment, Segments2), CountAcc + PubCount - AckCount} - end, {Segments, 0}, all_segment_nums(State2)); + end, {Segments, 0}, all_segment_nums(State1)); true -> %% At this stage, we will only know about files that %% were loaded during journal loading, They *will* have @@ -266,19 +259,23 @@ init(Name, MsgStoreRecovered, ContainsCheckFun) -> _ -> SegmentsN end - end, Segments, all_segment_nums(State2)), + end, Segments, all_segment_nums(State1)), %% the counts above include transient messages, which %% would be the wrong thing to return undefined} end, - %% flush again so we eagerly remove any segments that have become - %% empty due to either ContainsCheckFun returning false in the - %% non-clean recovery case or PubCount==AckCount in the clean - %% recovery case. Since the latter doesn't go through the journal - %% logic we we artificially set the dirty_count non zero. - State3 = flush_journal(State2 #qistate { segments = Segments1, + %% Flush so we eagerly remove any segments that have become empty + %% due to + %% a) processing the journal, + %% b) ContainsCheckFun returning false in the non-clean + %% recovery case, or + %% c) recovering a segment with PubCount==AckCount in the clean + %% recovery case + %% Since the latter doesn't go through the journal logic we we + %% artificially set the dirty_count non zero. + State2 = flush_journal(State1 #qistate { segments = Segments1, dirty_count = 1 }), - {Count, Terms, State3}. + {Count, Terms, State2}. terminate(Terms, State) -> terminate(true, Terms, State). @@ -352,6 +349,8 @@ read(Start, End, State = #qistate { segments = Segments, Segment = segment_find_or_new(StartSeg, Dir, Segments), {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), #segment { journal_entries = JEntries } = Segment1, + {SegEntries1, _PubCountDelta, _AckCountDelta} = + journal_plus_segment(JEntries, SegEntries), {array:sparse_foldr( fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq -> @@ -359,7 +358,7 @@ read(Start, End, State = #qistate { segments = Segments, IsPersistent, IsDelivered == del} | Acc ]; (_RelSeq, _Value, Acc) -> Acc - end, [], journal_plus_segment(JEntries, SegEntries)), + end, [], SegEntries1), Again, State #qistate { segments = segment_store(Segment1, Segments) }}. @@ -474,13 +473,17 @@ terminate(StoreShutdown, Terms, State = recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> {SegEntries, PubCount, AckCount, Segment1} = load_segment(false, Segment), + #segment { journal_entries = JEntries } = Segment1, + {SegEntries1, PubCountDelta, AckCountDelta} = + journal_plus_segment(JEntries, SegEntries), array:sparse_foldl( - fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) -> + fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment2) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, - Del, RelSeq, Segment3) + Del, RelSeq, Segment2) end, - Segment1 #segment { pubs = PubCount, acks = AckCount }, - SegEntries). + Segment1 #segment { pubs = PubCount + PubCountDelta, + acks = AckCount + AckCountDelta}, + SegEntries1). recover_message( true, true, _Del, _RelSeq, Segment) -> Segment; @@ -884,31 +887,35 @@ bool_to_int(false) -> 0. %% queue. journal_plus_segment(JEntries, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, SegEntriesOut) -> + fun (RelSeq, JObj, {SegEntriesOut, PubsAdded, AcksAdded}) -> SegEntry = array:get(RelSeq, SegEntriesOut), - case journal_plus_segment1(JObj, SegEntry) of - undefined -> array:reset(RelSeq, SegEntriesOut); - Obj -> array:set(RelSeq, Obj, SegEntriesOut) - end - end, SegEntries, JEntries). + {Obj, PubsAddedDelta, AcksAddedDelta} = + journal_plus_segment1(JObj, SegEntry), + {case Obj of + undefined -> array:reset(RelSeq, SegEntriesOut); + Obj -> array:set(RelSeq, Obj, SegEntriesOut) + end, + PubsAdded + PubsAddedDelta, + AcksAdded + AcksAddedDelta} + end, {SegEntries, 0, 0}, JEntries). %% Here, the result is the item which we may be adding to (for items %% only in the journal), modifying in (bits in both), or, when %% returning 'undefined', erasing from (ack in journal, not segment) %% the segment array. journal_plus_segment1({?PUB, no_del, no_ack} = Obj, undefined) -> - Obj; + {Obj, 1, 0}; journal_plus_segment1({?PUB, del, no_ack} = Obj, undefined) -> - Obj; + {Obj, 1, 0}; journal_plus_segment1({?PUB, del, ack}, undefined) -> - undefined; + {undefined, 1, 1}; journal_plus_segment1({no_pub, del, no_ack}, {?PUB = Pub, no_del, no_ack}) -> - {Pub, del, no_ack}; + {{Pub, del, no_ack}, 0, 0}; journal_plus_segment1({no_pub, del, ack}, {?PUB, no_del, no_ack}) -> - undefined; + {undefined, 0, 1}; journal_plus_segment1({no_pub, no_del, ack}, {?PUB, del, no_ack}) -> - undefined. + {undefined, 0, 1}. %% Remove from the journal entries for a segment, items that are %% duplicates of entries found in the segment itself. Used on start up |
