summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-02-20 20:38:33 +0300
committerGitHub <noreply@github.com>2017-02-20 20:38:33 +0300
commit702110c25fe8d0375bcf8f8ec5cbe2cb7023022f (patch)
tree3218f534f194e51696bf9d29322379a8c3afaea9
parentabfca783762741ddc6a88610118fd2fef15b49f0 (diff)
parent23f80ac04884ea08d2f8c1b567abd5689c8553e5 (diff)
downloadrabbitmq-server-git-702110c25fe8d0375bcf8f8ec5cbe2cb7023022f.tar.gz
Merge pull request #1115 from rabbitmq/rabbitmq-server-msg-store-recovery-optimize
Optimise message store recovery.
-rw-r--r--src/rabbit_msg_store.erl36
-rw-r--r--src/rabbit_msg_store_vhost_sup.erl66
-rw-r--r--src/rabbit_variable_queue.erl26
3 files changed, 71 insertions, 57 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 6c9eb92cff..d5c0031944 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -472,14 +472,14 @@
%% public API
%%----------------------------------------------------------------------------
-start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
+start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
gen_server2:start_link(?MODULE,
- [Name, Dir, ClientRefs, StartupFunState],
+ [Type, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
-start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
- gen_server2:start_link({local, Name}, ?MODULE,
- [Name, Dir, ClientRefs, StartupFunState],
+start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) ->
+ gen_server2:start_link({local, Type}, ?MODULE,
+ [Type, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).
successfully_recovered_state(Server) ->
@@ -711,16 +711,18 @@ clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([Name, BaseDir, ClientRefs, StartupFunState]) ->
+
+init([Type, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- Dir = filename:join(BaseDir, atom_to_list(Name)),
+ Dir = filename:join(BaseDir, atom_to_list(Type)),
+ Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)),
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
- rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]),
+ rabbit_log:info("Message store ~tp: using ~p to provide index~n", [Name, IndexModule]),
AttemptFileSummaryRecovery =
case ClientRefs of
@@ -730,16 +732,14 @@ init([Name, BaseDir, ClientRefs, StartupFunState]) ->
_ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
recover_crashed_compactions(Dir)
end,
-
%% if we found crashed compactions we trust neither the
%% file_summary nor the location index. Note the file_summary is
%% left empty here if it can't be recovered.
{FileSummaryRecovered, FileSummaryEts} =
recover_file_summary(AttemptFileSummaryRecovery, Dir),
-
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
- ClientRefs, Dir),
+ ClientRefs, Dir, Name),
Clients = dict:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
@@ -793,12 +793,10 @@ init([Name, BaseDir, ClientRefs, StartupFunState]) ->
cref_to_msg_ids = dict:new(),
credit_disc_bound = CreditDiscBound
},
-
%% If we didn't recover the msg location index then we need to
%% rebuild it now.
{Offset, State1 = #msstate { current_file = CurFile }} =
build_index(CleanShutdown, StartupFunState, State),
-
%% read is only needed so that we can seek
{ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile),
[read | ?WRITE_MODE]),
@@ -1547,16 +1545,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------
-recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) ->
+recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) ->
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) ->
- rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]),
+recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) ->
+ rabbit_log:warning("Message store ~tp: rebuilding indices from scratch~n", [Name]),
{false, IndexModule:new(Dir), []};
-recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) ->
+recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
- rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n"
+ rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
- [Dir | ErrorArgs]),
+ [Name | ErrorArgs]),
{false, IndexModule:new(Dir), []}
end,
case read_recovery_terms(Dir) of
diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl
index 0209e88cf7..5031c5f043 100644
--- a/src/rabbit_msg_store_vhost_sup.erl
+++ b/src/rabbit_msg_store_vhost_sup.erl
@@ -1,5 +1,7 @@
-module(rabbit_msg_store_vhost_sup).
+-include("rabbit.hrl").
+
-behaviour(supervisor2).
-export([start_link/3, init/1, add_vhost/2, delete_vhost/2,
@@ -8,32 +10,34 @@
%% Internal
-export([start_store_for_vhost/4]).
-start_link(Name, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs);
+start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs);
VhostsClientRefs == undefined ->
- supervisor2:start_link({local, Name}, ?MODULE,
- [Name, VhostsClientRefs, StartupFunState]).
+ supervisor2:start_link({local, Type}, ?MODULE,
+ [Type, VhostsClientRefs, StartupFunState]).
-init([Name, VhostsClientRefs, StartupFunState]) ->
- ets:new(Name, [named_table, public]),
+init([Type, VhostsClientRefs, StartupFunState]) ->
+ ets:new(Type, [named_table, public]),
{ok, {{simple_one_for_one, 1, 1},
[{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost,
- [Name, VhostsClientRefs, StartupFunState]},
+ [Type, VhostsClientRefs, StartupFunState]},
transient, infinity, supervisor, [rabbit_msg_store]}]}}.
-add_vhost(Name, VHost) ->
- supervisor2:start_child(Name, [VHost]).
+add_vhost(Type, VHost) ->
+ VHostPid = maybe_start_store_for_vhost(Type, VHost),
+ {ok, VHostPid}.
-start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) ->
- case vhost_store_pid(Name, VHost) of
+start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) ->
+ case vhost_store_pid(Type, VHost) of
no_pid ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
ok = rabbit_file:ensure_dir(VHostDir),
rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]),
VhostRefs = refs_for_vhost(VHost, VhostsClientRefs),
- case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of
+ VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost),
+ case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of
{ok, Pid} ->
- ets:insert(Name, {VHost, Pid}),
+ ets:insert(Type, {VHost, Pid}),
{ok, Pid};
Other -> Other
end;
@@ -41,6 +45,12 @@ start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) ->
{error, {already_started, Pid}}
end.
+startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) ->
+ QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames,
+ VH == VHost ],
+ {Fun, {start, QNamesForVhost}};
+startup_fun_state_for_vhost(State, _VHost) -> State.
+
refs_for_vhost(_, undefined) -> undefined;
refs_for_vhost(VHost, Refs) ->
case maps:find(VHost, Refs) of
@@ -49,45 +59,45 @@ refs_for_vhost(VHost, Refs) ->
end.
-delete_vhost(Name, VHost) ->
- case vhost_store_pid(Name, VHost) of
+delete_vhost(Type, VHost) ->
+ case vhost_store_pid(Type, VHost) of
no_pid -> ok;
Pid when is_pid(Pid) ->
- supervisor2:terminate_child(Name, Pid),
- cleanup_vhost_store(Name, VHost, Pid)
+ supervisor2:terminate_child(Type, Pid),
+ cleanup_vhost_store(Type, VHost, Pid)
end,
ok.
-client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
- VHostPid = maybe_start_store_for_vhost(Name, VHost),
+client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
+ VHostPid = maybe_start_store_for_vhost(Type, VHost),
rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun).
-maybe_start_store_for_vhost(Name, VHost) ->
- case add_vhost(Name, VHost) of
+maybe_start_store_for_vhost(Type, VHost) ->
+ case supervisor2:start_child(Type, [VHost]) of
{ok, Pid} -> Pid;
{error, {already_started, Pid}} -> Pid;
Error -> throw(Error)
end.
-vhost_store_pid(Name, VHost) ->
- case ets:lookup(Name, VHost) of
+vhost_store_pid(Type, VHost) ->
+ case ets:lookup(Type, VHost) of
[] -> no_pid;
[{VHost, Pid}] ->
case erlang:is_process_alive(Pid) of
true -> Pid;
false ->
- cleanup_vhost_store(Name, VHost, Pid),
+ cleanup_vhost_store(Type, VHost, Pid),
no_pid
end
end.
-cleanup_vhost_store(Name, VHost, Pid) ->
- ets:delete_object(Name, {VHost, Pid}).
+cleanup_vhost_store(Type, VHost, Pid) ->
+ ets:delete_object(Type, {VHost, Pid}).
-successfully_recovered_state(Name, VHost) ->
- case vhost_store_pid(Name, VHost) of
+successfully_recovered_state(Type, VHost) ->
+ case vhost_store_pid(Type, VHost) of
no_pid ->
- throw({message_store_not_started, Name, VHost});
+ throw({message_store_not_started, Type, VHost});
Pid when is_pid(Pid) ->
rabbit_msg_store:successfully_recovered_state(Pid)
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c42b4856f2..c0495bbb4b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -514,17 +514,21 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined ->
[?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]),
%% Start message store for all known vhosts
VHosts = rabbit_vhost:list(),
- lists:foreach(fun(VHost) ->
- add_vhost_msg_store(VHost)
+ %% TODO: recovery is limited by queue index recovery
+ %% pool size. There is no point in parallelizing vhost
+ %% recovery until there will be a queue index
+ %% recovery pool per vhost
+ lists:foreach(fun(Vhost) ->
+ add_vhost_msg_store(Vhost)
end,
- VHosts),
+ lists:sort(VHosts)),
ok.
add_vhost_msg_store(VHost) ->
- rabbit_log:info("Starting message store vor vhost ~p~n", [VHost]),
+ rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost),
rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost),
- rabbit_log:info("Message store is started vor vhost ~p~n", [VHost]).
+ rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
stop_msg_store() ->
ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP),
@@ -2802,8 +2806,8 @@ move_messages_to_vhost_store() ->
in_batches(MigrationBatchSize,
{rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]},
QueuesWithTerms,
- "Migrating batch ~p of ~p queues ~n",
- "Batch ~p of ~p queues migrated ~n"),
+ "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n",
+ "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"),
log_upgrade("Message store migration finished"),
delete_old_store(OldStore),
@@ -2817,11 +2821,13 @@ in_batches(Size, MFA, List, MessageStart, MessageEnd) ->
in_batches(_, _, _, [], _, _) -> ok;
in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
- {Batch, Tail} = case Size > length(List) of
+ Length = length(List),
+ {Batch, Tail} = case Size > Length of
true -> {List, []};
false -> lists:split(Size, List)
end,
- log_upgrade(MessageStart, [BatchNum, Size]),
+ ProcessedLength = (BatchNum - 1) * Size,
+ rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]),
{M, F, A} = MFA,
Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ],
lists:foreach(fun(Key) ->
@@ -2831,7 +2837,7 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) ->
end
end,
Keys),
- log_upgrade(MessageEnd, [BatchNum, Size]),
+ rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]),
in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd).
migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) ->