diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-19 15:33:27 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-19 15:33:27 +0100 |
| commit | d7b87754b8b7ec01e8d4586f754bb6a5636b9a50 (patch) | |
| tree | 57c779b58a40b0d978c151d320a0fc038887680f | |
| parent | cf0ceee78f5efbb059dbf4b3e5714ec58d621693 (diff) | |
| download | rabbitmq-server-git-d7b87754b8b7ec01e8d4586f754bb6a5636b9a50.tar.gz | |
do the minimum amount of work necessary on clean queue recovery
Only read from the journal. No segment reading. No writing.
| -rw-r--r-- | src/rabbit_queue_index.erl | 127 |
1 files changed, 60 insertions, 67 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index acd13a0640..e02483ef93 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -210,73 +210,19 @@ %%---------------------------------------------------------------------------- init(Name, MsgStoreRecovered, ContainsCheckFun) -> - State = blank_state(Name), - Terms = case read_shutdown_terms(State #qistate.dir) of + State = #qistate { dir = Dir } = blank_state(Name), + Terms = case read_shutdown_terms(Dir) of {error, _} -> []; {ok, Terms1} -> Terms1 end, - %% Load the journal completely. This will also load segments which - %% have entries in the journal and remove duplicates. The counts - %% will correctly reflect the combination of the segment and the - %% journal. - State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), CleanShutdown = detect_clean_shutdown(Dir), - {Segments1, Count} = + {Count, State1} = case CleanShutdown andalso MsgStoreRecovered of - false -> - %% Load each segment in turn and filter out messages - %% that are not in the msg_store, by adding acks to - %% the journal. These acks only go to the RAM journal - %% as it doesn't matter if we lose them. Also mark - %% delivered if not clean shutdown. Also find the - %% number of unacked messages. - lists:foldl( - fun (Seg, {Segments2, CountAcc}) -> - Segment = #segment { unacked = UnackedCount } = - recover_segment( - ContainsCheckFun, CleanShutdown, - segment_find_or_new(Seg, Dir, Segments2)), - {segment_store(Segment, Segments2), - CountAcc + UnackedCount} - end, {Segments, 0}, all_segment_nums(State1)); - true -> - %% At this stage, we will only know about files that - %% were loaded during journal loading, They *will* - %% have correct unacked counts, but for all remaining - %% segments, if they're not in the Segments store then - %% we need to add them and populate with saved data. - SegmentDictTerms = - dict:from_list(proplists:get_value(segments, Terms, [])), - {lists:foldl( - fun (Seg, SegmentsN) -> - case {segment_find(Seg, SegmentsN), - dict:find(Seg, SegmentDictTerms)} of - {error, {ok, UnackedCount}} -> - Segment = segment_new(Seg, Dir), - segment_store( - Segment #segment { - unacked = UnackedCount }, - SegmentsN); - _ -> - SegmentsN - end - end, Segments, all_segment_nums(State1)), - %% the counts above include transient messages, which - %% would be the wrong thing to return - undefined} + true -> RecoveredCounts = proplists:get_value(segments, Terms, []), + init_clean(RecoveredCounts, State); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State) end, - %% Flush so we eagerly remove any segments that have become empty - %% due to - %% a) processing the journal, - %% b) ContainsCheckFun returning false in the non-clean - %% recovery case, or - %% c) recovering a segment with PubCount==AckCount in the clean - %% recovery case - %% Since the latter doesn't go through the journal logic we we - %% artificially set the dirty_count non zero. - State2 = flush_journal(State1 #qistate { segments = Segments1, - dirty_count = 1 }), - {Count, Terms, State2}. + {Count, Terms, State1}. terminate(Terms, State) -> terminate(true, Terms, State). @@ -329,7 +275,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> ok = file_handle_cache:sync(JournalHdl), State. -flush(State) -> flush_journal(State). +flush(State = #qistate { dirty_count = 0 }) -> State; +flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> {[], undefined, State}; @@ -445,6 +392,50 @@ read_shutdown_terms(Dir) -> store_clean_shutdown(Terms, Dir) -> rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms). +init_clean(RecoveredCounts, State) -> + %% Load the journal. Since this is a clean recovery this (almost) + %% gets us back to where we were on shutdown. + State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State), + %% The journal loading only creates records for segments touched + %% by the journal, and the counts are based on the journal entries + %% only. We need *complete* counts for *all* segments. By an + %% amazing coincidence we stored that information on shutdown. + Segments1 = + lists:foldl( + fun ({Seg, UnackedCount}, SegmentsN) -> + Segment = segment_find_or_new(Seg, Dir, SegmentsN), + segment_store(Segment #segment {unacked = UnackedCount }, + SegmentsN) + end, Segments, RecoveredCounts), + %% the counts above include transient messages, which would be the + %% wrong thing to return + {undefined, State1 # qistate { segments = Segments1 }}. + +init_dirty(CleanShutdown, ContainsCheckFun, State) -> + %% Recover the journal completely. This will also load segments + %% which have entries in the journal and remove duplicates. The + %% counts will correctly reflect the combination of the segment + %% and the journal. + State1 = #qistate { dir = Dir, segments = Segments } = + recover_journal(State), + {Segments1, Count} = + %% Load each segment in turn and filter out messages that are + %% not in the msg_store, by adding acks to the journal. These + %% acks only go to the RAM journal as it doesn't matter if we + %% lose them. Also mark delivered if not clean shutdown. Also + %% find the number of unacked messages. + lists:foldl( + fun (Seg, {Segments2, CountAcc}) -> + Segment = #segment { unacked = UnackedCount } = + recover_segment(ContainsCheckFun, CleanShutdown, + segment_find_or_new(Seg, Dir, Segments2)), + {segment_store(Segment, Segments2), CountAcc + UnackedCount} + end, {Segments, 0}, all_segment_nums(State1)), + %% Unconditionally flush since the dirty_count doesn't get updated + %% by the above foldl. + State2 = flush_journal(State1 #qistate { segments = Segments1 }), + {Count, State2}. + terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) -> State; terminate(StoreShutdown, Terms, State = @@ -527,7 +518,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = load_journal(blank_state(QueueName)), + State = recover_journal(blank_state(QueueName)), State1 = lists:foldl( fun (Seg, State2) -> SeqId = reconstruct_seq_id(Seg, 0), @@ -585,8 +576,6 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount }) maybe_flush_journal(State) -> State. -flush_journal(State = #qistate { dirty_count = 0 }) -> - State; flush_journal(State = #qistate { segments = Segments }) -> Segments1 = segment_fold( @@ -627,7 +616,11 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> load_journal(State) -> {JournalHdl, State1} = get_journal_handle(State), {ok, 0} = file_handle_cache:position(JournalHdl, 0), - State2 = #qistate { segments = Segments } = load_journal_entries(State1), + load_journal_entries(State1). + +%% ditto +recover_journal(State) -> + State1 = #qistate { segments = Segments } = load_journal(State), Segments1 = segment_map( fun (_Seg, Segment = #segment { journal_entries = JEntries, @@ -644,7 +637,7 @@ load_journal(State) -> UnackedCountInSeg - UnackedCountDuplicates) } end, Segments), - State2 #qistate { segments = Segments1 }. + State1 #qistate { segments = Segments1 }. load_journal_entries(State = #qistate { journal_handle = Hdl }) -> case file_handle_cache:read(Hdl, ?SEQ_BYTES) of |
