summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-03-25 00:55:50 +0300
committerGitHub <noreply@github.com>2020-03-25 00:55:50 +0300
commit1ec1ce9d82836885972c0f952b870bc3f02f765c (patch)
tree516db3e54d53fb63a97665c71ff928a3ebacfbe8
parent88c7bbcc47f182faeb1b407d23792eb32ee7cab0 (diff)
parent260c0a0b6a61d8506b712c8b7580715e5bd99c63 (diff)
downloadrabbitmq-server-git-1ec1ce9d82836885972c0f952b870bc3f02f765c.tar.gz
Merge pull request #2279 from rabbitmq/startup_memory_fix
Reduce memory usage during startup
-rw-r--r--src/rabbit_msg_store.erl114
-rw-r--r--test/worker_pool_SUITE.erl188
2 files changed, 70 insertions, 232 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 19a437484c..e04ba8f139 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -797,8 +797,15 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
},
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
+ Cleanliness = case CleanShutdown of
+ true -> "clean";
+ false -> "unclean"
+ end,
+ rabbit_log:debug("Rebuilding message location index after ~s shutdown...~n",
+ [Cleanliness]),
{Offset, State1 = #msstate { current_file = CurFile }} =
build_index(CleanShutdown, StartupFunState, State),
+ rabbit_log:debug("Finished rebuilding index~n", []),
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
[read | ?WRITE_MODE]),
@@ -1734,54 +1741,26 @@ build_index(true, _StartupFunState,
end, {0, State}, FileSummaryEts);
build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
State = #msstate { dir = Dir }) ->
+ rabbit_log:debug("Rebuilding message refcount...~n", []),
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
+ rabbit_log:debug("Done rebuilding message refcount~n", []),
{ok, Pid} = gatherer:start_link(),
case [filename_to_num(FileName) ||
FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
- [] -> build_index(Pid, undefined, [State #msstate.current_file],
- State);
- Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
+ [] -> rebuild_index(Pid, [State #msstate.current_file],
+ State);
+ Files -> {Offset, State1} = rebuild_index(Pid, Files, State),
{Offset, lists:foldl(fun delete_file_if_empty/2,
State1, Files)}
end.
-build_index(Gatherer, Left, [],
- State = #msstate { file_summary_ets = FileSummaryEts,
- sum_valid_data = SumValid,
- sum_file_size = SumFileSize }) ->
- case gatherer:out(Gatherer) of
- empty ->
- ok = gatherer:stop(Gatherer),
- ok = index_clean_up_temporary_reference_count_entries(State),
- Offset = case ets:lookup(FileSummaryEts, Left) of
- [] -> 0;
- [#file_summary { file_size = FileSize }] -> FileSize
- end,
- {Offset, State #msstate { current_file = Left }};
- {value, #file_summary { valid_total_size = ValidTotalSize,
- file_size = FileSize } = FileSummary} ->
- true = ets:insert_new(FileSummaryEts, FileSummary),
- build_index(Gatherer, Left, [],
- State #msstate {
- sum_valid_data = SumValid + ValidTotalSize,
- sum_file_size = SumFileSize + FileSize })
- end;
-build_index(Gatherer, Left, [File|Files], State) ->
- ok = gatherer:fork(Gatherer),
- ok = worker_pool:submit_async(
- fun () ->
- link(Gatherer),
- ok = build_index_worker(Gatherer, State,
- Left, File, Files),
- unlink(Gatherer),
- ok
- end),
- build_index(Gatherer, File, Files, State).
-
build_index_worker(Gatherer, State = #msstate { dir = Dir },
Left, File, Files) ->
+ FileName = filenum_to_name(File),
+ rabbit_log:debug("Rebuilding message location index from ~p (~B file(s) remaining)~n",
+ [form_filename(Dir, FileName), length(Files)]),
{ok, Messages, FileSize} =
- scan_file_for_valid_messages(Dir, filenum_to_name(File)),
+ scan_file_for_valid_messages(Dir, FileName),
{ValidMessages, ValidTotalSize} =
lists:foldl(
fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
@@ -1810,15 +1789,62 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
[F|_] -> {F, FileSize}
end,
ok = gatherer:in(Gatherer, #file_summary {
- file = File,
- valid_total_size = ValidTotalSize,
- left = Left,
- right = Right,
- file_size = FileSize1,
- locked = false,
- readers = 0 }),
+ file = File,
+ valid_total_size = ValidTotalSize,
+ left = Left,
+ right = Right,
+ file_size = FileSize1,
+ locked = false,
+ readers = 0 }),
ok = gatherer:finish(Gatherer).
+enqueue_build_index_workers(_Gatherer, _Left, [], _State) ->
+ exit(normal);
+enqueue_build_index_workers(Gatherer, Left, [File|Files], State) ->
+ ok = worker_pool:dispatch_sync(
+ fun () ->
+ link(Gatherer),
+ ok = build_index_worker(Gatherer, State,
+ Left, File, Files),
+ unlink(Gatherer),
+ ok
+ end),
+ enqueue_build_index_workers(Gatherer, File, Files, State).
+
+reduce_index(Gatherer, LastFile,
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ sum_valid_data = SumValid,
+ sum_file_size = SumFileSize }) ->
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = gatherer:stop(Gatherer),
+ ok = index_clean_up_temporary_reference_count_entries(State),
+ Offset = case ets:lookup(FileSummaryEts, LastFile) of
+ [] -> 0;
+ [#file_summary { file_size = FileSize }] -> FileSize
+ end,
+ {Offset, State #msstate { current_file = LastFile }};
+ {value, #file_summary { valid_total_size = ValidTotalSize,
+ file_size = FileSize } = FileSummary} ->
+ true = ets:insert_new(FileSummaryEts, FileSummary),
+ reduce_index(Gatherer, LastFile,
+ State #msstate {
+ sum_valid_data = SumValid + ValidTotalSize,
+ sum_file_size = SumFileSize + FileSize })
+ end.
+
+rebuild_index(Gatherer, Files, State) ->
+ lists:foreach(fun (_File) ->
+ ok = gatherer:fork(Gatherer)
+ end, Files),
+ Pid = spawn(
+ fun () ->
+ enqueue_build_index_workers(Gatherer, undefined,
+ Files, State)
+ end),
+ erlang:monitor(process, Pid),
+ reduce_index(Gatherer, lists:last(Files), State).
+
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- internal
%%----------------------------------------------------------------------------
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
deleted file mode 100644
index 9b438381cc..0000000000
--- a/test/worker_pool_SUITE.erl
+++ /dev/null
@@ -1,188 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at https://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
-%%
-
--module(worker_pool_SUITE).
-
--compile(export_all).
--include_lib("common_test/include/ct.hrl").
-
-
--define(POOL_SIZE, 1).
--define(POOL_NAME, test_pool).
-
-all() ->
- [
- run_code_synchronously,
- run_code_asynchronously,
- set_timeout,
- cancel_timeout,
- cancel_timeout_by_setting
- ].
-
-init_per_testcase(_, Config) ->
- {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME),
- rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]).
-
-end_per_testcase(_, Config) ->
- Pool = ?config(pool_sup, Config),
- unlink(Pool),
- exit(Pool, kill).
-
-run_code_synchronously(_) ->
- Self = self(),
- Test = make_ref(),
- Sleep = 200,
- {Time, Result} = timer:tc(fun() ->
- worker_pool:submit(?POOL_NAME,
- fun() ->
- timer:sleep(Sleep),
- Self ! {hi, Test},
- self()
- end,
- reuse)
- end),
- % Worker run synchronously
- true = Time > Sleep,
- % Worker have sent message
- receive {hi, Test} -> ok
- after 0 -> error(no_message_from_worker)
- end,
- % Worker is a separate process
- true = (Self /= Result).
-
-run_code_asynchronously(_) ->
- Self = self(),
- Test = make_ref(),
- Sleep = 200,
- {Time, Result} = timer:tc(fun() ->
- worker_pool:submit_async(?POOL_NAME,
- fun() ->
- timer:sleep(Sleep),
- Self ! {hi, Test},
- self()
- end)
- end),
- % Worker run synchronously
- true = Time < Sleep,
- % Worker have sent message
- receive {hi, Test} -> ok
- after Sleep + 100 -> error(no_message_from_worker)
- end,
- % Worker is a separate process
- true = (Self /= Result).
-
-set_timeout(_) ->
- Self = self(),
- Test = make_ref(),
- Worker = worker_pool:submit(?POOL_NAME,
- fun() ->
- Worker = self(),
- timer:sleep(100),
- worker_pool_worker:set_timeout(
- my_timeout, 1000,
- fun() ->
- Self ! {hello, self(), Test}
- end),
- Worker
- end,
- reuse),
-
- % Timeout will occur after 1000 ms only
- receive {hello, Worker, Test} -> exit(timeout_should_wait)
- after 0 -> ok
- end,
-
- timer:sleep(1000),
-
- receive {hello, Worker, Test} -> ok
- after 1000 -> exit(timeout_is_late)
- end.
-
-
-cancel_timeout(_) ->
- Self = self(),
- Test = make_ref(),
- Worker = worker_pool:submit(?POOL_NAME,
- fun() ->
- Worker = self(),
- timer:sleep(100),
- worker_pool_worker:set_timeout(
- my_timeout, 1000,
- fun() ->
- Self ! {hello, self(), Test}
- end),
- Worker
- end,
- reuse),
-
- % Timeout will occur after 1000 ms only
- receive {hello, Worker, Test} -> exit(timeout_should_wait)
- after 0 -> ok
- end,
-
- worker_pool_worker:next_job_from(Worker, Self),
- Worker = worker_pool_worker:submit(Worker,
- fun() ->
- worker_pool_worker:clear_timeout(my_timeout),
- Worker
- end,
- reuse),
-
- timer:sleep(1000),
- receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
- after 0 -> ok
- end.
-
-cancel_timeout_by_setting(_) ->
- Self = self(),
- Test = make_ref(),
- Worker = worker_pool:submit(?POOL_NAME,
- fun() ->
- Worker = self(),
- timer:sleep(100),
- worker_pool_worker:set_timeout(
- my_timeout, 1000,
- fun() ->
- Self ! {hello, self(), Test}
- end),
- Worker
- end,
- reuse),
-
- % Timeout will occur after 1000 ms only
- receive {hello, Worker, Test} -> exit(timeout_should_wait)
- after 0 -> ok
- end,
-
- worker_pool_worker:next_job_from(Worker, Self),
- Worker = worker_pool_worker:submit(Worker,
- fun() ->
- worker_pool_worker:set_timeout(my_timeout, 1000,
- fun() ->
- Self ! {hello_reset, self(), Test}
- end),
- Worker
- end,
- reuse),
-
- timer:sleep(1000),
- receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
- after 0 -> ok
- end,
-
- receive {hello_reset, Worker, Test} -> ok
- after 1000 -> exit(timeout_is_late)
- end.