summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl87
1 files changed, 43 insertions, 44 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 832f403904..563c57d238 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -527,8 +527,11 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) ->
successfully_recovered = AllCleanShutdown
},
- ok = count_msg_refs(AllCleanShutdown, MsgRefDeltaGen, MsgRefDeltaGenInit,
- State),
+ ok = case AllCleanShutdown of
+ true -> ok;
+ false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State)
+ end,
+
FileNames =
sort_file_names(filelib:wildcard("*" ++ ?FILE_EXTENSION, Dir)),
TmpFileNames =
@@ -1130,11 +1133,12 @@ store_file_summary(Tid, Dir) ->
recover_file_summary(false, _Dir, _Server) ->
%% TODO: the only reason for this to be an *ordered*_set is so
- %% that maybe_compact can start a traversal from the eldest
- %% file. It's awkward to have both that odering and the left/right
- %% pointers in the entries - replacing the former with some
- %% additional bit of state would be easy, but ditching the latter
- %% would be neater.
+ %% that a) maybe_compact can start a traversal from the eldest
+ %% file, and b) build_index in fast recovery mode can easily
+ %% identify the current file. It's awkward to have both that
+ %% odering and the left/right pointers in the entries - replacing
+ %% the former with some additional bit of state would be easy, but
+ %% ditching the latter would be neater.
{false, ets:new(rabbit_msg_store_file_summary,
[ordered_set, public, {keypos, #file_summary.file}])};
recover_file_summary(true, Dir, Server) ->
@@ -1148,15 +1152,12 @@ recover_file_summary(true, Dir, Server) ->
recover_file_summary(false, Dir, Server)
end.
-count_msg_refs(false, Gen, Seed, State) ->
- count_msg_refs(Gen, Seed, State);
-count_msg_refs(true, _Gen, _Seed, _State) ->
- ok.
-
count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
- finished -> ok;
- {_Guid, 0, Next} -> count_msg_refs(Gen, Next, State);
+ finished ->
+ ok;
+ {_Guid, 0, Next} ->
+ count_msg_refs(Gen, Next, State);
{Guid, Delta, Next} ->
ok = case index_lookup(Guid, State) of
not_found ->
@@ -1176,19 +1177,21 @@ count_msg_refs(Gen, Seed, State) ->
end.
recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
- lists:foreach(fun (TmpFileName) ->
- ok = recover_crashed_compactions1(
- Dir, FileNames, TmpFileName)
- end, TmpFileNames),
+ lists:foreach(
+ fun (TmpFileName) ->
+ NonTmpRelatedFileName =
+ filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
+ true = lists:member(NonTmpRelatedFileName, FileNames),
+ ok = recover_crashed_compaction(
+ Dir, TmpFileName, NonTmpRelatedFileName)
+ end, TmpFileNames),
ok.
-recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
- NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
- true = lists:member(NonTmpRelatedFileName, FileNames),
+recover_crashed_compaction(Dir, TmpFileName, NonTmpRelatedFileName) ->
{ok, UncorruptedMessagesTmp, GuidsTmp} =
- scan_file_for_valid_messages_guids(Dir, TmpFileName),
+ scan_file_for_valid_messages_and_guids(Dir, TmpFileName),
{ok, UncorruptedMessages, Guids} =
- scan_file_for_valid_messages_guids(Dir, NonTmpRelatedFileName),
+ scan_file_for_valid_messages_and_guids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
%% tmpfile). This means that compaction failed immediately
@@ -1247,8 +1250,8 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% are in the tmp file
true = is_disjoint(Guids1, GuidsTmp),
%% must open with read flag, otherwise will stomp over contents
- {ok, MainHdl} = open_file(
- Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]),
+ {ok, MainHdl} = open_file(Dir, NonTmpRelatedFileName,
+ [read | ?WRITE_MODE]),
%% Wipe out any rubbish at the end of the file. Remember
%% the head of the list will be the highest entry in the
%% file.
@@ -1257,16 +1260,14 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% Extend the main file as big as necessary in a single
%% move. If we run out of disk space, this truncate could
%% fail, but we still aren't risking losing data
- ok = truncate_and_extend_file(
- MainHdl, Top, Top + TmpSize),
- {ok, TmpHdl} = open_file(
- Dir, TmpFileName, ?READ_AHEAD_MODE),
+ ok = truncate_and_extend_file(MainHdl, Top, Top + TmpSize),
+ {ok, TmpHdl} = open_file(Dir, TmpFileName, ?READ_AHEAD_MODE),
{ok, TmpSize} = file_handle_cache:copy(TmpHdl, MainHdl, TmpSize),
ok = file_handle_cache:close(MainHdl),
ok = file_handle_cache:delete(TmpHdl),
{ok, _MainMessages, GuidsMain} =
- scan_file_for_valid_messages_guids(
+ scan_file_for_valid_messages_and_guids(
Dir, NonTmpRelatedFileName),
%% check that everything in Guids1 is in GuidsMain
true = is_sublist(Guids1, GuidsMain),
@@ -1283,28 +1284,25 @@ is_disjoint(SmallerL, BiggerL) ->
scan_file_for_valid_messages(Dir, FileName) ->
case open_file(Dir, FileName, ?READ_MODE) of
- {ok, Hdl} ->
- Size = filelib:file_size(form_filename(Dir, FileName)),
- Valid = rabbit_msg_file:scan(Hdl, Size),
- %% if something really bad's happened, the close could fail,
- %% but ignore
- file_handle_cache:close(Hdl),
- Valid;
+ {ok, Hdl} -> Valid = rabbit_msg_file:scan(
+ Hdl, filelib:file_size(
+ form_filename(Dir, FileName))),
+ %% if something really bad has happened,
+ %% the close could fail, but ignore
+ file_handle_cache:close(Hdl),
+ Valid;
{error, enoent} -> {ok, [], 0};
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_file_for_valid_messages_guids(Dir, FileName) ->
- {ok, Messages, _FileSize} =
- scan_file_for_valid_messages(Dir, FileName),
+scan_file_for_valid_messages_and_guids(Dir, FileName) ->
+ {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName),
{ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
%% produces. The list of msgs that is produced is youngest first.
-find_contiguous_block_prefix([]) -> {0, []};
-find_contiguous_block_prefix(List) ->
- find_contiguous_block_prefix(List, 0, []).
+find_contiguous_block_prefix(L) -> find_contiguous_block_prefix(L, 0, []).
find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids};
@@ -1374,7 +1372,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
lists:foldl(
fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
case index_lookup(Guid, State) of
- not_found -> {VMAcc, VTSAcc};
+ not_found ->
+ {VMAcc, VTSAcc};
StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,