diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-15 11:03:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-15 11:03:22 +0100 |
| commit | 948ecda509ebbc70c135a33833189534b862526b (patch) | |
| tree | 3eb0a00cca1a1e6ef5775bfb3d553a2c8f17bde1 /src | |
| parent | 7b05055eb9ae4d2b7a0a291d03bdc04ad286e556 (diff) | |
| download | rabbitmq-server-git-948ecda509ebbc70c135a33833189534b862526b.tar.gz | |
Correct and simplify the recovery from crashed compactions
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 123 |
1 files changed, 22 insertions, 101 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 46b4582d23..7fa0979c13 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -550,13 +550,15 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)), TmpFileNames = sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION_TMP, Dir)), - ok = recover_crashed_compactions(Dir, FileNames, TmpFileNames), + FoundCrashedCompactions = + recover_crashed_compactions(Dir, FileNames, TmpFileNames), %% There should be no more tmp files now, so go ahead and load the %% whole lot Files = [filename_to_num(FileName) || FileName <- FileNames], + NeedsIndexBuild = FoundCrashedCompactions orelse not FileSummaryRecovered, {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(FileSummaryRecovered, Files, State), + build_index(NeedsIndexBuild, Files, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -1223,103 +1225,26 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> ok = recover_crashed_compaction( Dir, TmpFileName, NonTmpRelatedFileName) end, TmpFileNames), - ok. + [] =/= TmpFileNames. recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) -> - {ok, UncorruptedMessagesTmp, GuidsTmp} = - scan_file_for_valid_messages_and_guids(Dir, TmpFileName), - {ok, UncorruptedMessages, Guids} = - scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName), - %% 1) It's possible that everything in the tmp file is also in the - %% main file such that the main file is (prefix ++ - %% tmpfile). This means that compaction failed immediately - %% prior to the final step of deleting the tmp file. Plan: just - %% delete the tmp file - %% 2) It's possible that everything in the tmp file is also in the - %% main file but with holes throughout (or just somthing like - %% main = (prefix ++ hole ++ tmpfile)). This means that - %% compaction wrote out the tmp file successfully and then - %% failed. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 3) It's possible that everything in the tmp file is also in the - %% main file but such that the main file does not end with tmp - %% file (and there are valid messages in the suffix; main = - %% (prefix ++ tmpfile[with extra holes?] ++ suffix)). This - %% means that compaction failed as we were writing out the tmp - %% file. Plan: just delete the tmp file and allow the - %% compaction to eventually be triggered later - %% 4) It's possible that there are messages in the tmp file which - %% are not in the main file. This means that writing out the - %% tmp file succeeded, but then we failed as we were copying - %% them back over to the main file, after truncating the main - %% file. As the main file has already been truncated, it should - %% consist only of valid messages. Plan: Truncate the main file - %% back to before any of the files in the tmp file and copy - %% them over again - TmpPath = form_filename(Dir, TmpFileName), - case is_sublist(GuidsTmp, Guids) of - true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file - %% note this also catches the case when the tmp file - %% is empty - ok = file:delete(TmpPath); - false -> - %% We're in case 4 above. We only care about the inital - %% msgs in main file that are not in the tmp file. If - %% there are no msgs in the tmp file then we would be in - %% the 'true' branch of this case, so we know the - %% lists:last call is safe. - EldestTmpGuid = lists:last(GuidsTmp), - {Guids1, UncorruptedMessages1} - = case lists:splitwith( - fun (Guid) -> Guid =/= EldestTmpGuid end, Guids) of - {_Guids, []} -> %% no msgs from tmp in main - {Guids, UncorruptedMessages}; - {Dropped, [EldestTmpGuid | Rest]} -> - %% Msgs in Dropped are in tmp, so forget them. - %% *cry*. Lists indexed from 1. - {Rest, lists:sublist(UncorruptedMessages, - 2 + length(Dropped), - length(Rest))} - end, - %% The main file prefix should be contiguous - {Top, Guids1} = find_contiguous_block_prefix( - lists:reverse(UncorruptedMessages1)), - %% we should have that none of the messages in the prefix - %% are in the tmp file - true = is_disjoint(Guids1, GuidsTmp), - %% must open with read flag, otherwise will stomp over contents - {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, - [read | ?WRITE_MODE]), - %% Wipe out any rubbish at the end of the file. Remember - %% the head of the list will be the highest entry in the - %% file. - [{_, TmpTopTotalSize, TmpTopOffset}|_] = UncorruptedMessagesTmp, - TmpSize = TmpTopOffset + TmpTopTotalSize, - %% Extend the main file as big as necessary in a single - %% move. If we run out of disk space, this truncate could - %% fail, but we still aren't risking losing data - ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize), - {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), - {ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize), - ok = file_handle_cache:close(MainHdl), - ok = file_handle_cache:delete(TmpHdl), - - {ok, _MainMessages, GuidsMain} = - scan_file_for_valid_messages_and_guids( - Dir, NonTmpRelatedFileName), - %% check that everything in Guids1 is in GuidsMain - true = is_sublist(Guids1, GuidsMain), - %% check that everything in GuidsTmp is in GuidsMain - true = is_sublist(GuidsTmp, GuidsMain) - end, + %% Because a msg can legitimately appear multiple times in the + %% same file, identifying the contents of the tmp file and where + %% they came from is non-trivial. If we are recovering a crashed + %% compaction then we will be rebuilding the index, which can cope + %% with duplicates appearing. Thus the simplest and safest thing + %% to do is to append the contents of the tmp file to its main + %% file. + {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE), + {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName, + ?READ_AHEAD_MODE ++ ?WRITE_MODE), + {ok, _End} = file_handle_cache:position(MainHdl, eof), + Size = filelib:file_size(form_filename(Dir, TmpFileName)), + {ok, Size} = file_handle_cache:copy(TmpHdl, MainHdl, Size), + ok = file_handle_cache:close(MainHdl), + ok = file_handle_cache:delete(TmpHdl), ok. -is_sublist(SmallerL, BiggerL) -> - lists:all(fun (Item) -> lists:member(Item, BiggerL) end, SmallerL). - -is_disjoint(SmallerL, BiggerL) -> - lists:all(fun (Item) -> not lists:member(Item, BiggerL) end, SmallerL). - scan_file_for_valid_messages(Dir, FileName) -> case open_file(Dir, FileName, ?READ_MODE) of {ok, Hdl} -> Valid = rabbit_msg_file:scan( @@ -1333,10 +1258,6 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_file_for_valid_messages_and_guids(Dir, FileName) -> - {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. - %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages %% produces. The list of msgs that is produced is youngest first. @@ -1351,7 +1272,7 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, _Files, State = #msstate { +build_index(false, _Files, State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, @@ -1364,7 +1285,7 @@ build_index(true, _Files, State = #msstate { sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(false, Files, State) -> +build_index(true, Files, State) -> {ok, Pid} = gatherer:start_link(), case Files of [] -> build_index(Pid, undefined, [State #msstate.current_file], State); |
