diff options
| author | Michael Klishin <michael@novemberain.com> | 2017-02-20 20:38:33 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2017-02-20 20:38:33 +0300 |
| commit | 702110c25fe8d0375bcf8f8ec5cbe2cb7023022f (patch) | |
| tree | 3218f534f194e51696bf9d29322379a8c3afaea9 | |
| parent | abfca783762741ddc6a88610118fd2fef15b49f0 (diff) | |
| parent | 23f80ac04884ea08d2f8c1b567abd5689c8553e5 (diff) | |
| download | rabbitmq-server-git-702110c25fe8d0375bcf8f8ec5cbe2cb7023022f.tar.gz | |
Merge pull request #1115 from rabbitmq/rabbitmq-server-msg-store-recovery-optimize
Optimise message store recovery.
| -rw-r--r-- | src/rabbit_msg_store.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_msg_store_vhost_sup.erl | 66 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
3 files changed, 71 insertions, 57 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 6c9eb92cff..d5c0031944 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -472,14 +472,14 @@ %% public API %%---------------------------------------------------------------------------- -start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> +start_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> gen_server2:start_link(?MODULE, - [Name, Dir, ClientRefs, StartupFunState], + [Type, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). -start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) -> - gen_server2:start_link({local, Name}, ?MODULE, - [Name, Dir, ClientRefs, StartupFunState], +start_global_store_link(Type, Dir, ClientRefs, StartupFunState) when is_atom(Type) -> + gen_server2:start_link({local, Type}, ?MODULE, + [Type, Dir, ClientRefs, StartupFunState], [{timeout, infinity}]). successfully_recovered_state(Server) -> @@ -711,16 +711,18 @@ clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, %% gen_server callbacks %%---------------------------------------------------------------------------- -init([Name, BaseDir, ClientRefs, StartupFunState]) -> + +init([Type, BaseDir, ClientRefs, StartupFunState]) -> process_flag(trap_exit, true), ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - Dir = filename:join(BaseDir, atom_to_list(Name)), + Dir = filename:join(BaseDir, atom_to_list(Type)), + Name = filename:join(filename:basename(BaseDir), atom_to_list(Type)), {ok, IndexModule} = application:get_env(rabbit, msg_store_index_module), - rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]), + rabbit_log:info("Message store ~tp: using ~p to provide index~n", [Name, IndexModule]), AttemptFileSummaryRecovery = case ClientRefs of @@ -730,16 +732,14 @@ init([Name, BaseDir, ClientRefs, StartupFunState]) -> _ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")), recover_crashed_compactions(Dir) end, - %% if we found crashed compactions we trust neither the %% file_summary nor the location index. Note the file_summary is %% left empty here if it can't be recovered. {FileSummaryRecovered, FileSummaryEts} = recover_file_summary(AttemptFileSummaryRecovery, Dir), - {CleanShutdown, IndexState, ClientRefs1} = recover_index_and_client_refs(IndexModule, FileSummaryRecovered, - ClientRefs, Dir), + ClientRefs, Dir, Name), Clients = dict:from_list( [{CRef, {undefined, undefined, undefined}} || CRef <- ClientRefs1]), @@ -793,12 +793,10 @@ init([Name, BaseDir, ClientRefs, StartupFunState]) -> cref_to_msg_ids = dict:new(), credit_disc_bound = CreditDiscBound }, - %% If we didn't recover the msg location index then we need to %% rebuild it now. {Offset, State1 = #msstate { current_file = CurFile }} = build_index(CleanShutdown, StartupFunState, State), - %% read is only needed so that we can seek {ok, CurHdl} = open_file(Dir, filenum_to_name(CurFile), [read | ?WRITE_MODE]), @@ -1547,16 +1545,16 @@ index_delete_by_file(File, #msstate { index_module = Index, %% shutdown and recovery %%---------------------------------------------------------------------------- -recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) -> +recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Name) -> {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) -> - rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]), +recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Name) -> + rabbit_log:warning("Message store ~tp: rebuilding indices from scratch~n", [Name]), {false, IndexModule:new(Dir), []}; -recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) -> +recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Name) -> Fresh = fun (ErrorMsg, ErrorArgs) -> - rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n" + rabbit_log:warning("Message store ~tp : " ++ ErrorMsg ++ "~n" "rebuilding indices from scratch~n", - [Dir | ErrorArgs]), + [Name | ErrorArgs]), {false, IndexModule:new(Dir), []} end, case read_recovery_terms(Dir) of diff --git a/src/rabbit_msg_store_vhost_sup.erl b/src/rabbit_msg_store_vhost_sup.erl index 0209e88cf7..5031c5f043 100644 --- a/src/rabbit_msg_store_vhost_sup.erl +++ b/src/rabbit_msg_store_vhost_sup.erl @@ -1,5 +1,7 @@ -module(rabbit_msg_store_vhost_sup). +-include("rabbit.hrl"). + -behaviour(supervisor2). -export([start_link/3, init/1, add_vhost/2, delete_vhost/2, @@ -8,32 +10,34 @@ %% Internal -export([start_store_for_vhost/4]). -start_link(Name, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs); +start_link(Type, VhostsClientRefs, StartupFunState) when is_map(VhostsClientRefs); VhostsClientRefs == undefined -> - supervisor2:start_link({local, Name}, ?MODULE, - [Name, VhostsClientRefs, StartupFunState]). + supervisor2:start_link({local, Type}, ?MODULE, + [Type, VhostsClientRefs, StartupFunState]). -init([Name, VhostsClientRefs, StartupFunState]) -> - ets:new(Name, [named_table, public]), +init([Type, VhostsClientRefs, StartupFunState]) -> + ets:new(Type, [named_table, public]), {ok, {{simple_one_for_one, 1, 1}, [{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_store_for_vhost, - [Name, VhostsClientRefs, StartupFunState]}, + [Type, VhostsClientRefs, StartupFunState]}, transient, infinity, supervisor, [rabbit_msg_store]}]}}. -add_vhost(Name, VHost) -> - supervisor2:start_child(Name, [VHost]). +add_vhost(Type, VHost) -> + VHostPid = maybe_start_store_for_vhost(Type, VHost), + {ok, VHostPid}. -start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) -> - case vhost_store_pid(Name, VHost) of +start_store_for_vhost(Type, VhostsClientRefs, StartupFunState, VHost) -> + case vhost_store_pid(Type, VHost) of no_pid -> VHostDir = rabbit_vhost:msg_store_dir_path(VHost), ok = rabbit_file:ensure_dir(VHostDir), rabbit_log:info("Making sure message store directory '~s' for vhost '~s' exists~n", [VHostDir, VHost]), VhostRefs = refs_for_vhost(VHost, VhostsClientRefs), - case rabbit_msg_store:start_link(Name, VHostDir, VhostRefs, StartupFunState) of + VhostStartupFunState = startup_fun_state_for_vhost(StartupFunState, VHost), + case rabbit_msg_store:start_link(Type, VHostDir, VhostRefs, VhostStartupFunState) of {ok, Pid} -> - ets:insert(Name, {VHost, Pid}), + ets:insert(Type, {VHost, Pid}), {ok, Pid}; Other -> Other end; @@ -41,6 +45,12 @@ start_store_for_vhost(Name, VhostsClientRefs, StartupFunState, VHost) -> {error, {already_started, Pid}} end. +startup_fun_state_for_vhost({Fun, {start, [#resource{}|_] = QNames}}, VHost) -> + QNamesForVhost = [QName || QName = #resource{virtual_host = VH} <- QNames, + VH == VHost ], + {Fun, {start, QNamesForVhost}}; +startup_fun_state_for_vhost(State, _VHost) -> State. + refs_for_vhost(_, undefined) -> undefined; refs_for_vhost(VHost, Refs) -> case maps:find(VHost, Refs) of @@ -49,45 +59,45 @@ refs_for_vhost(VHost, Refs) -> end. -delete_vhost(Name, VHost) -> - case vhost_store_pid(Name, VHost) of +delete_vhost(Type, VHost) -> + case vhost_store_pid(Type, VHost) of no_pid -> ok; Pid when is_pid(Pid) -> - supervisor2:terminate_child(Name, Pid), - cleanup_vhost_store(Name, VHost, Pid) + supervisor2:terminate_child(Type, Pid), + cleanup_vhost_store(Type, VHost, Pid) end, ok. -client_init(Name, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> - VHostPid = maybe_start_store_for_vhost(Name, VHost), +client_init(Type, Ref, MsgOnDiskFun, CloseFDsFun, VHost) -> + VHostPid = maybe_start_store_for_vhost(Type, VHost), rabbit_msg_store:client_init(VHostPid, Ref, MsgOnDiskFun, CloseFDsFun). -maybe_start_store_for_vhost(Name, VHost) -> - case add_vhost(Name, VHost) of +maybe_start_store_for_vhost(Type, VHost) -> + case supervisor2:start_child(Type, [VHost]) of {ok, Pid} -> Pid; {error, {already_started, Pid}} -> Pid; Error -> throw(Error) end. -vhost_store_pid(Name, VHost) -> - case ets:lookup(Name, VHost) of +vhost_store_pid(Type, VHost) -> + case ets:lookup(Type, VHost) of [] -> no_pid; [{VHost, Pid}] -> case erlang:is_process_alive(Pid) of true -> Pid; false -> - cleanup_vhost_store(Name, VHost, Pid), + cleanup_vhost_store(Type, VHost, Pid), no_pid end end. -cleanup_vhost_store(Name, VHost, Pid) -> - ets:delete_object(Name, {VHost, Pid}). +cleanup_vhost_store(Type, VHost, Pid) -> + ets:delete_object(Type, {VHost, Pid}). -successfully_recovered_state(Name, VHost) -> - case vhost_store_pid(Name, VHost) of +successfully_recovered_state(Type, VHost) -> + case vhost_store_pid(Type, VHost) of no_pid -> - throw({message_store_not_started, Name, VHost}); + throw({message_store_not_started, Type, VHost}); Pid when is_pid(Pid) -> rabbit_msg_store:successfully_recovered_state(Pid) end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c42b4856f2..c0495bbb4b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -514,17 +514,21 @@ start_msg_store(Refs, StartFunState) when is_map(Refs); Refs == undefined -> [?PERSISTENT_MSG_STORE_SUP, Refs, StartFunState]), %% Start message store for all known vhosts VHosts = rabbit_vhost:list(), - lists:foreach(fun(VHost) -> - add_vhost_msg_store(VHost) + %% TODO: recovery is limited by queue index recovery + %% pool size. There is no point in parallelizing vhost + %% recovery until there will be a queue index + %% recovery pool per vhost + lists:foreach(fun(Vhost) -> + add_vhost_msg_store(Vhost) end, - VHosts), + lists:sort(VHosts)), ok. add_vhost_msg_store(VHost) -> - rabbit_log:info("Starting message store vor vhost ~p~n", [VHost]), + rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), rabbit_msg_store_vhost_sup:add_vhost(?TRANSIENT_MSG_STORE_SUP, VHost), rabbit_msg_store_vhost_sup:add_vhost(?PERSISTENT_MSG_STORE_SUP, VHost), - rabbit_log:info("Message store is started vor vhost ~p~n", [VHost]). + rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE_SUP), @@ -2802,8 +2806,8 @@ move_messages_to_vhost_store() -> in_batches(MigrationBatchSize, {rabbit_variable_queue, migrate_queue, [OldStore, NewStoreSup]}, QueuesWithTerms, - "Migrating batch ~p of ~p queues ~n", - "Batch ~p of ~p queues migrated ~n"), + "message_store upgrades: Migrating batch ~p of ~p queues. Out of total ~p ~n", + "message_store upgrades: Batch ~p of ~p queues migrated ~n. ~p total left"), log_upgrade("Message store migration finished"), delete_old_store(OldStore), @@ -2817,11 +2821,13 @@ in_batches(Size, MFA, List, MessageStart, MessageEnd) -> in_batches(_, _, _, [], _, _) -> ok; in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> - {Batch, Tail} = case Size > length(List) of + Length = length(List), + {Batch, Tail} = case Size > Length of true -> {List, []}; false -> lists:split(Size, List) end, - log_upgrade(MessageStart, [BatchNum, Size]), + ProcessedLength = (BatchNum - 1) * Size, + rabbit_log:info(MessageStart, [BatchNum, Size, ProcessedLength + Length]), {M, F, A} = MFA, Keys = [ rpc:async_call(node(), M, F, [El | A]) || El <- Batch ], lists:foreach(fun(Key) -> @@ -2831,7 +2837,7 @@ in_batches(Size, BatchNum, MFA, List, MessageStart, MessageEnd) -> end end, Keys), - log_upgrade(MessageEnd, [BatchNum, Size]), + rabbit_log:info(MessageEnd, [BatchNum, Size, length(Tail)]), in_batches(Size, BatchNum + 1, MFA, Tail, MessageStart, MessageEnd). migrate_queue({QueueName = #resource{virtual_host = VHost, name = Name}, RecoveryTerm}, OldStore, NewStoreSup) -> |
