diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2020-03-25 00:55:50 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-03-25 00:55:50 +0300 |
| commit | 1ec1ce9d82836885972c0f952b870bc3f02f765c (patch) | |
| tree | 516db3e54d53fb63a97665c71ff928a3ebacfbe8 | |
| parent | 88c7bbcc47f182faeb1b407d23792eb32ee7cab0 (diff) | |
| parent | 260c0a0b6a61d8506b712c8b7580715e5bd99c63 (diff) | |
| download | rabbitmq-server-git-1ec1ce9d82836885972c0f952b870bc3f02f765c.tar.gz | |
Merge pull request #2279 from rabbitmq/startup_memory_fix
Reduce memory usage during startup
| -rw-r--r-- | src/rabbit_msg_store.erl | 114 | ||||
| -rw-r--r-- | test/worker_pool_SUITE.erl | 188 |
2 files changed, 70 insertions, 232 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 19a437484c..e04ba8f139 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -797,8 +797,15 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> }, %% If we didn't recover the msg location index then we need to %% rebuild it now. + Cleanliness = case CleanShutdown of + true -> "clean"; + false -> "unclean" + end, + rabbit_log:debug("Rebuilding message location index after ~s shutdown...~n", + [Cleanliness]), {Offset, State1 = #msstate { current_file = CurFile }} = build_index(CleanShutdown, StartupFunState, State), + rabbit_log:debug("Finished rebuilding index~n", []), %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), [read | ?WRITE_MODE]), @@ -1734,54 +1741,26 @@ build_index(true, _StartupFunState, end, {0, State}, FileSummaryEts); build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, State = #msstate { dir = Dir }) -> + rabbit_log:debug("Rebuilding message refcount...~n", []), ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), + rabbit_log:debug("Done rebuilding message refcount~n", []), {ok, Pid} = gatherer:start_link(), case [filename_to_num(FileName) || FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of - [] -> build_index(Pid, undefined, [State #msstate.current_file], - State); - Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), + [] -> rebuild_index(Pid, [State #msstate.current_file], + State); + Files -> {Offset, State1} = rebuild_index(Pid, Files, State), {Offset, lists:foldl(fun delete_file_if_empty/2, State1, Files)} end. -build_index(Gatherer, Left, [], - State = #msstate { file_summary_ets = FileSummaryEts, - sum_valid_data = SumValid, - sum_file_size = SumFileSize }) -> - case gatherer:out(Gatherer) of - empty -> - ok = gatherer:stop(Gatherer), - ok = index_clean_up_temporary_reference_count_entries(State), - Offset = case ets:lookup(FileSummaryEts, Left) of - [] -> 0; - [#file_summary { file_size = FileSize }] -> FileSize - end, - {Offset, State #msstate { current_file = Left }}; - {value, #file_summary { valid_total_size = ValidTotalSize, - file_size = FileSize } = FileSummary} -> - true = ets:insert_new(FileSummaryEts, FileSummary), - build_index(Gatherer, Left, [], - State #msstate { - sum_valid_data = SumValid + ValidTotalSize, - sum_file_size = SumFileSize + FileSize }) - end; -build_index(Gatherer, Left, [File|Files], State) -> - ok = gatherer:fork(Gatherer), - ok = worker_pool:submit_async( - fun () -> - link(Gatherer), - ok = build_index_worker(Gatherer, State, - Left, File, Files), - unlink(Gatherer), - ok - end), - build_index(Gatherer, File, Files, State). - build_index_worker(Gatherer, State = #msstate { dir = Dir }, Left, File, Files) -> + FileName = filenum_to_name(File), + rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)~n", + [form_filename(Dir, FileName), length(Files)]), {ok, Messages, FileSize} = - scan_file_for_valid_messages(Dir, filenum_to_name(File)), + scan_file_for_valid_messages(Dir, FileName), {ValidMessages, ValidTotalSize} = lists:foldl( fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> @@ -1810,15 +1789,62 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, [F|_] -> {F, FileSize} end, ok = gatherer:in(Gatherer, #file_summary { - file = File, - valid_total_size = ValidTotalSize, - left = Left, - right = Right, - file_size = FileSize1, - locked = false, - readers = 0 }), + file = File, + valid_total_size = ValidTotalSize, + left = Left, + right = Right, + file_size = FileSize1, + locked = false, + readers = 0 }), ok = gatherer:finish(Gatherer). +enqueue_build_index_workers(_Gatherer, _Left, [], _State) -> + exit(normal); +enqueue_build_index_workers(Gatherer, Left, [File|Files], State) -> + ok = worker_pool:dispatch_sync( + fun () -> + link(Gatherer), + ok = build_index_worker(Gatherer, State, + Left, File, Files), + unlink(Gatherer), + ok + end), + enqueue_build_index_workers(Gatherer, File, Files, State). + +reduce_index(Gatherer, LastFile, + State = #msstate { file_summary_ets = FileSummaryEts, + sum_valid_data = SumValid, + sum_file_size = SumFileSize }) -> + case gatherer:out(Gatherer) of + empty -> + ok = gatherer:stop(Gatherer), + ok = index_clean_up_temporary_reference_count_entries(State), + Offset = case ets:lookup(FileSummaryEts, LastFile) of + [] -> 0; + [#file_summary { file_size = FileSize }] -> FileSize + end, + {Offset, State #msstate { current_file = LastFile }}; + {value, #file_summary { valid_total_size = ValidTotalSize, + file_size = FileSize } = FileSummary} -> + true = ets:insert_new(FileSummaryEts, FileSummary), + reduce_index(Gatherer, LastFile, + State #msstate { + sum_valid_data = SumValid + ValidTotalSize, + sum_file_size = SumFileSize + FileSize }) + end. + +rebuild_index(Gatherer, Files, State) -> + lists:foreach(fun (_File) -> + ok = gatherer:fork(Gatherer) + end, Files), + Pid = spawn( + fun () -> + enqueue_build_index_workers(Gatherer, undefined, + Files, State) + end), + erlang:monitor(process, Pid), + reduce_index(Gatherer, lists:last(Files), State). + %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- internal %%---------------------------------------------------------------------------- diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl deleted file mode 100644 index 9b438381cc..0000000000 --- a/test/worker_pool_SUITE.erl +++ /dev/null @@ -1,188 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at https://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. -%% - --module(worker_pool_SUITE). - --compile(export_all). --include_lib("common_test/include/ct.hrl"). - - --define(POOL_SIZE, 1). --define(POOL_NAME, test_pool). - -all() -> - [ - run_code_synchronously, - run_code_asynchronously, - set_timeout, - cancel_timeout, - cancel_timeout_by_setting - ]. - -init_per_testcase(_, Config) -> - {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME), - rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]). - -end_per_testcase(_, Config) -> - Pool = ?config(pool_sup, Config), - unlink(Pool), - exit(Pool, kill). - -run_code_synchronously(_) -> - Self = self(), - Test = make_ref(), - Sleep = 200, - {Time, Result} = timer:tc(fun() -> - worker_pool:submit(?POOL_NAME, - fun() -> - timer:sleep(Sleep), - Self ! {hi, Test}, - self() - end, - reuse) - end), - % Worker run synchronously - true = Time > Sleep, - % Worker have sent message - receive {hi, Test} -> ok - after 0 -> error(no_message_from_worker) - end, - % Worker is a separate process - true = (Self /= Result). - -run_code_asynchronously(_) -> - Self = self(), - Test = make_ref(), - Sleep = 200, - {Time, Result} = timer:tc(fun() -> - worker_pool:submit_async(?POOL_NAME, - fun() -> - timer:sleep(Sleep), - Self ! {hi, Test}, - self() - end) - end), - % Worker run synchronously - true = Time < Sleep, - % Worker have sent message - receive {hi, Test} -> ok - after Sleep + 100 -> error(no_message_from_worker) - end, - % Worker is a separate process - true = (Self /= Result). - -set_timeout(_) -> - Self = self(), - Test = make_ref(), - Worker = worker_pool:submit(?POOL_NAME, - fun() -> - Worker = self(), - timer:sleep(100), - worker_pool_worker:set_timeout( - my_timeout, 1000, - fun() -> - Self ! {hello, self(), Test} - end), - Worker - end, - reuse), - - % Timeout will occur after 1000 ms only - receive {hello, Worker, Test} -> exit(timeout_should_wait) - after 0 -> ok - end, - - timer:sleep(1000), - - receive {hello, Worker, Test} -> ok - after 1000 -> exit(timeout_is_late) - end. - - -cancel_timeout(_) -> - Self = self(), - Test = make_ref(), - Worker = worker_pool:submit(?POOL_NAME, - fun() -> - Worker = self(), - timer:sleep(100), - worker_pool_worker:set_timeout( - my_timeout, 1000, - fun() -> - Self ! {hello, self(), Test} - end), - Worker - end, - reuse), - - % Timeout will occur after 1000 ms only - receive {hello, Worker, Test} -> exit(timeout_should_wait) - after 0 -> ok - end, - - worker_pool_worker:next_job_from(Worker, Self), - Worker = worker_pool_worker:submit(Worker, - fun() -> - worker_pool_worker:clear_timeout(my_timeout), - Worker - end, - reuse), - - timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) - after 0 -> ok - end. - -cancel_timeout_by_setting(_) -> - Self = self(), - Test = make_ref(), - Worker = worker_pool:submit(?POOL_NAME, - fun() -> - Worker = self(), - timer:sleep(100), - worker_pool_worker:set_timeout( - my_timeout, 1000, - fun() -> - Self ! {hello, self(), Test} - end), - Worker - end, - reuse), - - % Timeout will occur after 1000 ms only - receive {hello, Worker, Test} -> exit(timeout_should_wait) - after 0 -> ok - end, - - worker_pool_worker:next_job_from(Worker, Self), - Worker = worker_pool_worker:submit(Worker, - fun() -> - worker_pool_worker:set_timeout(my_timeout, 1000, - fun() -> - Self ! {hello_reset, self(), Test} - end), - Worker - end, - reuse), - - timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) - after 0 -> ok - end, - - receive {hello_reset, Worker, Test} -> ok - after 1000 -> exit(timeout_is_late) - end. |
