summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-19 15:33:27 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-19 15:33:27 +0100
commitd7b87754b8b7ec01e8d4586f754bb6a5636b9a50 (patch)
tree57c779b58a40b0d978c151d320a0fc038887680f /src
parentcf0ceee78f5efbb059dbf4b3e5714ec58d621693 (diff)
downloadrabbitmq-server-git-d7b87754b8b7ec01e8d4586f754bb6a5636b9a50.tar.gz
do the minimum amount of work necessary on clean queue recovery
Only read from the journal. No segment reading. No writing.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl127
1 files changed, 60 insertions, 67 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index acd13a0640..e02483ef93 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -210,73 +210,19 @@
%%----------------------------------------------------------------------------
init(Name, MsgStoreRecovered, ContainsCheckFun) ->
- State = blank_state(Name),
- Terms = case read_shutdown_terms(State #qistate.dir) of
+ State = #qistate { dir = Dir } = blank_state(Name),
+ Terms = case read_shutdown_terms(Dir) of
{error, _} -> [];
{ok, Terms1} -> Terms1
end,
- %% Load the journal completely. This will also load segments which
- %% have entries in the journal and remove duplicates. The counts
- %% will correctly reflect the combination of the segment and the
- %% journal.
- State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
CleanShutdown = detect_clean_shutdown(Dir),
- {Segments1, Count} =
+ {Count, State1} =
case CleanShutdown andalso MsgStoreRecovered of
- false ->
- %% Load each segment in turn and filter out messages
- %% that are not in the msg_store, by adding acks to
- %% the journal. These acks only go to the RAM journal
- %% as it doesn't matter if we lose them. Also mark
- %% delivered if not clean shutdown. Also find the
- %% number of unacked messages.
- lists:foldl(
- fun (Seg, {Segments2, CountAcc}) ->
- Segment = #segment { unacked = UnackedCount } =
- recover_segment(
- ContainsCheckFun, CleanShutdown,
- segment_find_or_new(Seg, Dir, Segments2)),
- {segment_store(Segment, Segments2),
- CountAcc + UnackedCount}
- end, {Segments, 0}, all_segment_nums(State1));
- true ->
- %% At this stage, we will only know about files that
- %% were loaded during journal loading, They *will*
- %% have correct unacked counts, but for all remaining
- %% segments, if they're not in the Segments store then
- %% we need to add them and populate with saved data.
- SegmentDictTerms =
- dict:from_list(proplists:get_value(segments, Terms, [])),
- {lists:foldl(
- fun (Seg, SegmentsN) ->
- case {segment_find(Seg, SegmentsN),
- dict:find(Seg, SegmentDictTerms)} of
- {error, {ok, UnackedCount}} ->
- Segment = segment_new(Seg, Dir),
- segment_store(
- Segment #segment {
- unacked = UnackedCount },
- SegmentsN);
- _ ->
- SegmentsN
- end
- end, Segments, all_segment_nums(State1)),
- %% the counts above include transient messages, which
- %% would be the wrong thing to return
- undefined}
+ true -> RecoveredCounts = proplists:get_value(segments, Terms, []),
+ init_clean(RecoveredCounts, State);
+ false -> init_dirty(CleanShutdown, ContainsCheckFun, State)
end,
- %% Flush so we eagerly remove any segments that have become empty
- %% due to
- %% a) processing the journal,
- %% b) ContainsCheckFun returning false in the non-clean
- %% recovery case, or
- %% c) recovering a segment with PubCount==AckCount in the clean
- %% recovery case
- %% Since the latter doesn't go through the journal logic we we
- %% artificially set the dirty_count non zero.
- State2 = flush_journal(State1 #qistate { segments = Segments1,
- dirty_count = 1 }),
- {Count, Terms, State2}.
+ {Count, Terms, State1}.
terminate(Terms, State) ->
terminate(true, Terms, State).
@@ -329,7 +275,8 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) ->
ok = file_handle_cache:sync(JournalHdl),
State.
-flush(State) -> flush_journal(State).
+flush(State = #qistate { dirty_count = 0 }) -> State;
+flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
{[], undefined, State};
@@ -445,6 +392,50 @@ read_shutdown_terms(Dir) ->
store_clean_shutdown(Terms, Dir) ->
rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+init_clean(RecoveredCounts, State) ->
+ %% Load the journal. Since this is a clean recovery this (almost)
+ %% gets us back to where we were on shutdown.
+ State1 = #qistate { dir = Dir, segments = Segments } = load_journal(State),
+ %% The journal loading only creates records for segments touched
+ %% by the journal, and the counts are based on the journal entries
+ %% only. We need *complete* counts for *all* segments. By an
+ %% amazing coincidence we stored that information on shutdown.
+ Segments1 =
+ lists:foldl(
+ fun ({Seg, UnackedCount}, SegmentsN) ->
+ Segment = segment_find_or_new(Seg, Dir, SegmentsN),
+ segment_store(Segment #segment {unacked = UnackedCount },
+ SegmentsN)
+ end, Segments, RecoveredCounts),
+ %% the counts above include transient messages, which would be the
+ %% wrong thing to return
+ {undefined, State1 # qistate { segments = Segments1 }}.
+
+init_dirty(CleanShutdown, ContainsCheckFun, State) ->
+ %% Recover the journal completely. This will also load segments
+ %% which have entries in the journal and remove duplicates. The
+ %% counts will correctly reflect the combination of the segment
+ %% and the journal.
+ State1 = #qistate { dir = Dir, segments = Segments } =
+ recover_journal(State),
+ {Segments1, Count} =
+ %% Load each segment in turn and filter out messages that are
+ %% not in the msg_store, by adding acks to the journal. These
+ %% acks only go to the RAM journal as it doesn't matter if we
+ %% lose them. Also mark delivered if not clean shutdown. Also
+ %% find the number of unacked messages.
+ lists:foldl(
+ fun (Seg, {Segments2, CountAcc}) ->
+ Segment = #segment { unacked = UnackedCount } =
+ recover_segment(ContainsCheckFun, CleanShutdown,
+ segment_find_or_new(Seg, Dir, Segments2)),
+ {segment_store(Segment, Segments2), CountAcc + UnackedCount}
+ end, {Segments, 0}, all_segment_nums(State1)),
+ %% Unconditionally flush since the dirty_count doesn't get updated
+ %% by the above foldl.
+ State2 = flush_journal(State1 #qistate { segments = Segments1 }),
+ {Count, State2}.
+
terminate(_StoreShutdown, _Terms, State = #qistate { segments = undefined }) ->
State;
terminate(StoreShutdown, Terms, State =
@@ -527,7 +518,7 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = load_journal(blank_state(QueueName)),
+ State = recover_journal(blank_state(QueueName)),
State1 = lists:foldl(
fun (Seg, State2) ->
SeqId = reconstruct_seq_id(Seg, 0),
@@ -585,8 +576,6 @@ maybe_flush_journal(State = #qistate { dirty_count = DCount })
maybe_flush_journal(State) ->
State.
-flush_journal(State = #qistate { dirty_count = 0 }) ->
- State;
flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
@@ -627,7 +616,11 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
load_journal(State) ->
{JournalHdl, State1} = get_journal_handle(State),
{ok, 0} = file_handle_cache:position(JournalHdl, 0),
- State2 = #qistate { segments = Segments } = load_journal_entries(State1),
+ load_journal_entries(State1).
+
+%% ditto
+recover_journal(State) ->
+ State1 = #qistate { segments = Segments } = load_journal(State),
Segments1 =
segment_map(
fun (_Seg, Segment = #segment { journal_entries = JEntries,
@@ -644,7 +637,7 @@ load_journal(State) ->
UnackedCountInSeg -
UnackedCountDuplicates) }
end, Segments),
- State2 #qistate { segments = Segments1 }.
+ State1 #qistate { segments = Segments1 }.
load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
case file_handle_cache:read(Hdl, ?SEQ_BYTES) of