summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl113
1 files changed, 69 insertions, 44 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 19a437484c..7faa724536 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -797,8 +797,15 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
},
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
+ Cleanliness = case CleanShutdown of
+ true -> "clean";
+ false -> "unclean"
+ end,
+ rabbit_log:debug("Rebuilding message location index after ~s shutdown...~n",
+ [Cleanliness]),
{Offset, State1 = #msstate { current_file = CurFile }} =
build_index(CleanShutdown, StartupFunState, State),
+ rabbit_log:debug("Finished rebuilding index~n", []),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
[read | ?WRITE_MODE]),
@@ -1734,54 +1741,26 @@ build_index(true, _StartupFunState,
end, {0, State}, FileSummaryEts);
build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
State = #msstate { dir = Dir }) ->
+ rabbit_log:debug("Rebuilding message refcount...~n", []),
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
+ rabbit_log:debug("Done rebuilding message refcount~n", []),
{ok, Pid} = gatherer:start_link(),
case [filename_to_num(FileName) ||
FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
- [] -> build_index(Pid, undefined, [State #msstate.current_file],
- State);
- Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ [] -> rebuild_index(Pid, [State #msstate.current_file],
+ State);
+ Files -> {Offset, State1} = rebuild_index(Pid, Files, State),
{Offset, lists:foldl(fun delete_file_if_empty/2,
State1, Files)}
end.
-build_index(Gatherer, Left, [],
- State = #msstate { file_summary_ets = FileSummaryEts,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize }) ->
- case gatherer:out(Gatherer) of
- empty ->
- ok = gatherer:stop(Gatherer),
- ok = index_clean_up_temporary_reference_count_entries(State),
- Offset = case ets:lookup(FileSummaryEts, Left) of
- [] -> 0;
- [#file_summary { file_size = FileSize }] -> FileSize
- end,
- {Offset, State #msstate { current_file = Left }};
- {value, #file_summary { valid_total_size = ValidTotalSize,
- file_size = FileSize } = FileSummary} ->
- true = ets:insert_new(FileSummaryEts, FileSummary),
- build_index(Gatherer, Left, [],
- State #msstate {
- sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize })
- end;
-build_index(Gatherer, Left, [File|Files], State) ->
- ok = gatherer:fork(Gatherer),
- ok = worker_pool:submit_async(
- fun () ->
- link(Gatherer),
- ok = build_index_worker(Gatherer, State,
- Left, File, Files),
- unlink(Gatherer),
- ok
- end),
- build_index(Gatherer, File, Files, State).
-
build_index_worker(Gatherer, State = #msstate { dir = Dir },
Left, File, Files) ->
+ FileName = filenum_to_name(File),
+ rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)~n",
+ [form_filename(Dir, FileName), length(Files)]),
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ scan_file_for_valid_messages(Dir, FileName),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -1810,15 +1789,61 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
[F|_] -> {F, FileSize}
end,
ok = gatherer:in(Gatherer, #file_summary {
- file = File,
- valid_total_size = ValidTotalSize,
- left = Left,
- right = Right,
- file_size = FileSize1,
- locked = false,
- readers = 0 }),
+ file = File,
+ valid_total_size = ValidTotalSize,
+ left = Left,
+ right = Right,
+ file_size = FileSize1,
+ locked = false,
+ readers = 0 }),
ok = gatherer:finish(Gatherer).
+enqueue_build_index_workers(Gatherer, Left, [], State) ->
+ exit(normal);
+enqueue_build_index_workers(Gatherer, Left, [File|Files], State) ->
+ ok = worker_pool:submit(
+ fun () ->
+ link(Gatherer),
+ ok = build_index_worker(Gatherer, State,
+ Left, File, Files),
+ unlink(Gatherer),
+ ok
+ end),
+ enqueue_build_index_workers(Gatherer, File, Files, State).
+
+reduce_index(Gatherer, LastFile,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer),
+ ok = index_clean_up_temporary_reference_count_entries(State),
+ Offset = case ets:lookup(FileSummaryEts, LastFile) of
+ [] -> 0;
+ [#file_summary { file_size = FileSize }] -> FileSize
+ end,
+ {Offset, State #msstate { current_file = LastFile }};
+ {value, #file_summary { valid_total_size = ValidTotalSize,
+ file_size = FileSize } = FileSummary} ->
+ true = ets:insert_new(FileSummaryEts, FileSummary),
+ reduce_index(Gatherer, LastFile,
+ State #msstate {
+ sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize })
+ end.
+
+rebuild_index(Gatherer, Files, State) ->
+ lists:foreach(fun (File) ->
+ ok = gatherer:fork(Gatherer)
+ end, Files),
+ spawn(
+ fun () ->
+ enqueue_build_index_workers(Gatherer, undefined, Files,
+ State)
+ end),
+ reduce_index(Gatherer, lists:last(Files), State).
+
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------