diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 30 |
1 files changed, 13 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 521fffcf6a..f78f2bcfe0 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -495,7 +495,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> +init([Server, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, @@ -522,16 +522,16 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> recover_file_summary(not FoundCrashedCompactions, Dir, Server), %% FileSummaryRecovered => not FoundCrashedCompactions - {AllCleanShutdown, IndexState, ClientRefs1} = + {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, ClientRefs, Dir, Server), - %% AllCleanShutdown => msg location index and file_summary both + %% CleanShutdown => msg location index and file_summary both %% recovered correctly. - true = case {FileSummaryRecovered, AllCleanShutdown} of + true = case {FileSummaryRecovered, CleanShutdown} of {true, false} -> ets:delete_all_objects(FileSummaryEts); _ -> true end, - %% AllCleanShutdown <=> msg location index and file_summary both + %% CleanShutdown <=> msg location index and file_summary both %% recovered correctly. DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]), @@ -559,21 +559,14 @@ init([Server, BaseDir, ClientRefs, {MsgRefDeltaGen, MsgRefDeltaGenInit}]) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, - successfully_recovered = AllCleanShutdown, + successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit }, %% If we didn't recover the msg location index then we need to - %% rebuild it now. Does not touch file_summary (will stay empty). 2-stage - %% process. This part just seeds the index with msg guids and - %% ref_counts that the queues report to us. - ok = case AllCleanShutdown of - true -> ok; - false -> count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State) - end, - + %% rebuild it now. {Offset, State1 = #msstate { current_file = CurFile }} = - build_index(AllCleanShutdown, State), + build_index(CleanShutdown, StartupFunState, State), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), @@ -1293,7 +1286,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, State = #msstate { file_summary_ets = FileSummaryEts }) -> +build_index(true, _StartupFunState, + State = #msstate { file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, file_size = FileSize, @@ -1305,7 +1299,9 @@ build_index(true, State = #msstate { file_summary_ets = FileSummaryEts }) -> sum_file_size = SumFileSize + FileSize, current_file = File }} end, {0, State}, FileSummaryEts); -build_index(false, State = #msstate { dir = Dir }) -> +build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, + State = #msstate { dir = Dir }) -> + ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), {ok, Pid} = gatherer:start_link(), case [filename_to_num(FileName) || FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of |
