summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl123
1 files changed, 22 insertions, 101 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 46b4582d23..7fa0979c13 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -550,13 +550,15 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)),
- ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames),
+ FoundCrashedCompactions =
+ recover_crashed_compactions(Dir, FileNames, TmpFileNames),
%% There should be no more tmp files now, so go ahead and load the
%% whole lot
Files = [filename_to_num(FileName) || FileName <- FileNames],
+ NeedsIndexBuild = FoundCrashedCompactions orelse not FileSummaryRecovered,
{Offset, State1 = #msstate { current_file = CurFile }} =
- build_index(FileSummaryRecovered, Files, State),
+ build_index(NeedsIndexBuild, Files, State),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
@@ -1223,103 +1225,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
ok = recover_crashed_compaction(
Dir, TmpFileName, NonTmpRelatedFileName)
end, TmpFileNames),
- ok.
+ [] =/= TmpFileNames.
recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
- {ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
- {ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
- %% 1) It's possible that everything in the tmp file is also in the
- %% main file such that the main file is (prefix ++
- %% tmpfile). This means that compaction failed immediately
- %% prior to the final step of deleting the tmp file. Plan: just
- %% delete the tmp file
- %% 2) It's possible that everything in the tmp file is also in the
- %% main file but with holes throughout (or just somthing like
- %% main = (prefix ++ hole ++ tmpfile)). This means that
- %% compaction wrote out the tmp file successfully and then
- %% failed. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 3) It's possible that everything in the tmp file is also in the
- %% main file but such that the main file does not end with tmp
- %% file (and there are valid messages in the suffix; main =
- %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This
- %% means that compaction failed as we were writing out the tmp
- %% file. Plan: just delete the tmp file and allow the
- %% compaction to eventually be triggered later
- %% 4) It's possible that there are messages in the tmp file which
- %% are not in the main file. This means that writing out the
- %% tmp file succeeded, but then we failed as we were copying
- %% them back over to the main file, after truncating the main
- %% file. As the main file has already been truncated, it should
- %% consist only of valid messages. Plan: Truncate the main file
- %% back to before any of the files in the tmp file and copy
- %% them over again
- TmpPath = form_filename(Dir, TmpFileName),
- case is_sublist(GuidsTmp, Guids) of
- true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
- %% note this also catches the case when the tmp file
- %% is empty
- ok = file:delete(TmpPath);
- false ->
- %% We're in case 4 above. We only care about the inital
- %% msgs in main file that are not in the tmp file. If
- %% there are no msgs in the tmp file then we would be in
- %% the 'true' branch of this case, so we know the
- %% lists:last call is safe.
- EldestTmpGuid = lists:last(GuidsTmp),
- {Guids1, UncorruptedMessages1}
- = case lists:splitwith(
- fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of
- {_Guids, []} -> %% no msgs from tmp in main
- {Guids, UncorruptedMessages};
- {Dropped, [EldestTmpGuid | Rest]} ->
- %% Msgs in Dropped are in tmp, so forget them.
- %% *cry*. Lists indexed from 1.
- {Rest, lists:sublist(UncorruptedMessages,
- 2 + length(Dropped),
- length(Rest))}
- end,
- %% The main file prefix should be contiguous
- {Top, Guids1} = find_contiguous_block_prefix(
- lists:reverse(UncorruptedMessages1)),
- %% we should have that none of the messages in the prefix
- %% are in the tmp file
- true = is_disjoint(Guids1, GuidsTmp),
- %% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
- [read | ?WRITE_MODE]),
- %% Wipe out any rubbish at the end of the file. Remember
- %% the head of the list will be the highest entry in the
- %% file.
- [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp,
- TmpSize = TmpTopOffset + TmpTopTotalSize,
- %% Extend the main file as big as necessary in a single
- %% move. If we run out of disk space, this truncate could
- %% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
- {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
- ok = file_handle_cache:close(MainHdl),
- ok = file_handle_cache:delete(TmpHdl),
-
- {ok, _MainMessages, GuidsMain} =
- scan_file_for_valid_messages_and_guids(
- Dir, NonTmpRelatedFileName),
- %% check that everything in Guids1 is in GuidsMain
- true = is_sublist(Guids1, GuidsMain),
- %% check that everything in GuidsTmp is in GuidsMain
- true = is_sublist(GuidsTmp, GuidsMain)
- end,
+ %% Because a msg can legitimately appear multiple times in the
+ %% same file, identifying the contents of the tmp file and where
+ %% they came from is non-trivial. If we are recovering a crashed
+ %% compaction then we will be rebuilding the index, which can cope
+ %% with duplicates appearing. Thus the simplest and safest thing
+ %% to do is to append the contents of the tmp file to its main
+ %% file.
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
+ ?READ_AHEAD_MODE ++ ?WRITE_MODE),
+ {ok, _End} = file_handle_cache:position(MainHdl, eof),
+ Size = filelib:file_size(form_filename(Dir, TmpFileName)),
+ {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size),
+ ok = file_handle_cache:close(MainHdl),
+ ok = file_handle_cache:delete(TmpHdl),
ok.
-is_sublist(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL).
-
-is_disjoint(SmallerL, BiggerL) ->
- lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL).
-
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
{ok, Hdl} -> Valid = rabbit_msg_file:scan(
@@ -1333,10 +1258,6 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_and_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
-
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
@@ -1351,7 +1272,7 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(true, _Files, State = #msstate {
+build_index(false, _Files, State = #msstate {
file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
@@ -1364,7 +1285,7 @@ build_index(true, _Files, State = #msstate {
sum_file_size = SumFileSize + FileSize,
current_file = File }}
end, {0, State}, FileSummaryEts);
-build_index(false, Files, State) ->
+build_index(true, Files, State) ->
{ok, Pid} = gatherer:start_link(),
case Files of
[] -> build_index(Pid, undefined, [State #msstate.current_file], State);