diff options
| -rw-r--r-- | src/rabbit_msg_store.erl | 113 |
1 files changed, 69 insertions, 44 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 19a437484c..7faa724536 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -797,8 +797,15 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> }, %% If we didn't recover the msg location index then we need to %% rebuild it now. + Cleanliness = case CleanShutdown of + true -> "clean"; + false -> "unclean" + end, + rabbit_log:debug("Rebuilding message location index after ~s shutdown...~n", + [Cleanliness]), {Offset, State1 = #msstate { current_file = CurFile }} = build_index(CleanShutdown, StartupFunState, State), + rabbit_log:debug("Finished rebuilding index~n", []), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), [read | ?WRITE_MODE]), @@ -1734,54 +1741,26 @@ build_index(true, _StartupFunState, end, {0, State}, FileSummaryEts); build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, State = #msstate { dir = Dir }) -> + rabbit_log:debug("Rebuilding message refcount...~n", []), ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), + rabbit_log:debug("Done rebuilding message refcount~n", []), {ok, Pid} = gatherer:start_link(), case [filename_to_num(FileName) || FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of - [] -> build_index(Pid, undefined, [State #msstate.current_file], - State); - Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), + [] -> rebuild_index(Pid, [State #msstate.current_file], + State); + Files -> {Offset, State1} = rebuild_index(Pid, Files, State), {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} end. -build_index(Gatherer, Left, [], - State = #msstate { file_summary_ets = FileSummaryEts, - sum_valid_data = SumValid, - sum_file_size = SumFileSize }) -> - case gatherer:out(Gatherer) of - empty -> - ok = gatherer:stop(Gatherer), - ok = index_clean_up_temporary_reference_count_entries(State), - Offset = case ets:lookup(FileSummaryEts, Left) of - [] -> 0; - [#file_summary { file_size = FileSize }] -> FileSize - end, - {Offset, State #msstate { current_file = Left }}; - {value, #file_summary { valid_total_size = ValidTotalSize, - file_size = FileSize } = FileSummary} -> - true = ets:insert_new(FileSummaryEts, FileSummary), - build_index(Gatherer, Left, [], - State #msstate { - sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize }) - end; -build_index(Gatherer, Left, [File|Files], State) -> - ok = gatherer:fork(Gatherer), - ok = worker_pool:submit_async( - fun () -> - link(Gatherer), - ok = build_index_worker(Gatherer, State, - Left, File, Files), - unlink(Gatherer), - ok - end), - build_index(Gatherer, File, Files, State). - build_index_worker(Gatherer, State = #msstate { dir = Dir }, Left, File, Files) -> + FileName = filenum_to_name(File), + rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)~n", + [form_filename(Dir, FileName), length(Files)]), {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + scan_file_for_valid_messages(Dir, FileName), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -1810,15 +1789,61 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, [F|_] -> {F, FileSize} end, ok = gatherer:in(Gatherer, #file_summary { - file = File, - valid_total_size = ValidTotalSize, - left = Left, - right = Right, - file_size = FileSize1, - locked = false, - readers = 0 }), + file = File, + valid_total_size = ValidTotalSize, + left = Left, + right = Right, + file_size = FileSize1, + locked = false, + readers = 0 }), ok = gatherer:finish(Gatherer). +enqueue_build_index_workers(Gatherer, Left, [], State) -> + exit(normal); +enqueue_build_index_workers(Gatherer, Left, [File|Files], State) -> + ok = worker_pool:submit( + fun () -> + link(Gatherer), + ok = build_index_worker(Gatherer, State, + Left, File, Files), + unlink(Gatherer), + ok + end), + enqueue_build_index_workers(Gatherer, File, Files, State). + +reduce_index(Gatherer, LastFile, + State = #msstate { file_summary_ets = FileSummaryEts, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer), + ok = index_clean_up_temporary_reference_count_entries(State), + Offset = case ets:lookup(FileSummaryEts, LastFile) of + [] -> 0; + [#file_summary { file_size = FileSize }] -> FileSize + end, + {Offset, State #msstate { current_file = LastFile }}; + {value, #file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize } = FileSummary} -> + true = ets:insert_new(FileSummaryEts, FileSummary), + reduce_index(Gatherer, LastFile, + State #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize }) + end. + +rebuild_index(Gatherer, Files, State) -> + lists:foreach(fun (File) -> + ok = gatherer:fork(Gatherer) + end, Files), + spawn( + fun () -> + enqueue_build_index_workers(Gatherer, undefined, Files, + State) + end), + reduce_index(Gatherer, lists:last(Files), State). + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- internal %%---------------------------------------------------------------------------- |
