diff options
| -rw-r--r-- | Makefile | 10 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_watcher.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 13 |
12 files changed, 210 insertions, 88 deletions
@@ -112,17 +112,19 @@ define PROJECT_ENV {passphrase, undefined} ]}, - %% rabbitmq-server-973 + %% rabbitmq-server#973 {queue_explicit_gc_run_operation_threshold, 1000}, {lazy_queue_explicit_gc_run_operation_threshold, 1000}, {background_gc_enabled, false}, {background_gc_target_interval, 60000}, - %% rabbitmq-server-589 + %% rabbitmq-server#589 {proxy_protocol, false}, {disk_monitor_failure_retries, 10}, {disk_monitor_failure_retry_interval, 120000}, - %% either "stop_node" or "ignore" - {vhost_restart_strategy, stop_node} + %% either "stop_node" or "continue". + %% by default we choose to not terminate the entire node if one + %% vhost had to shut down, see server#1158 and server#1280 + {vhost_restart_strategy, continue} ] endef diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index c503548187..70351f116c 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -976,11 +976,11 @@ end}. {mapping, "proxy_protocol", "rabbit.proxy_protocol", [{datatype, {enum, [true, false]}}]}. -%% Whether to stop the rabbit application if VHost data -%% cannot be recovered. +%% Whether to stop the rabbit application if a vhost has +%% to terminate for any reason. {mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy", - [{datatype, {enum, [stop_node, ignore]}}]}. + [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}. % ========================== % Lager section diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4eead35c1d..81eb5edf6e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,8 +234,13 @@ recover(VHost) -> %% for further processing in recover_durable_queues. {ok, OrderedRecoveryTerms} = BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]), - {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost), - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)). + case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of + {ok, _} -> + recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + {error, Reason} -> + rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), + throw({error, Reason}) + end. stop(VHost) -> ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 347dbbb48a..b5ef86255d 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) -> -spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - supervisor2:start_child( - VHostSup, - {rabbit_amqqueue_sup_sup, - {rabbit_amqqueue_sup_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + {error, {no_such_vhost, VHost}} + end. -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), - ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index be9b1b6227..73fc9c7449 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,20 +48,35 @@ %%---------------------------------------------------------------------------- start(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - {ok, _} = supervisor2:start_child( - VHostSup, - {?MODULE, - {?MODULE, start_link, [VHost]}, - transient, ?WORKER_WAIT, worker, - [?MODULE]}), + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]) + end, ok. stop(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - case supervisor:terminate_child(VHostSup, ?MODULE) of - ok -> supervisor:delete_child(VHostSup, ?MODULE); - E -> E + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end; + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]), + + ok end. store(VHost, DirBaseName, Terms) -> @@ -74,7 +89,14 @@ read(VHost, DirBaseName) -> end. clear(VHost) -> - ok = dets:delete_all_objects(VHost), + try + dets:delete_all_objects(VHost) + %% see start/1 + catch _:badarg -> + rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!", + [VHost]), + ok + end, flush(VHost). start_link(VHost) -> @@ -126,8 +148,15 @@ open_global_table() -> ok. close_global_table() -> - ok = dets:sync(?MODULE), - ok = dets:close(?MODULE). + try + dets:sync(?MODULE), + dets:close(?MODULE) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to clear global recovery terms: table no longer exists!", + []), + ok + end. read_global(DirBaseName) -> read(?MODULE, DirBaseName). @@ -163,8 +192,23 @@ open_table(VHost) -> {ram_file, true}, {auto_save, infinity}]). -flush(VHost) -> ok = dets:sync(VHost). +flush(VHost) -> + try + dets:sync(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. close_table(VHost) -> - ok = flush(VHost), - ok = dets:close(VHost). + try + ok = flush(VHost), + ok = dets:close(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 40967e316e..dd08916e35 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -496,15 +496,26 @@ stop(VHost) -> start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), + do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), + ok. + +do_start_msg_store(VHost, Type, Refs, StartFunState) -> + case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of + {ok, _} -> + rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]); + {error, {no_such_vhost, VHost}} = Err -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n", + [Type, VHost]), + exit(Err); + {error, Error} -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n", + [Type, VHost, Error]), + exit({error, Error}) + end. + +abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; +abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. stop_msg_store(VHost) -> rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 4dc2ec86d0..7513c23925 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -27,8 +27,8 @@ -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). --spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. --spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. +-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). +-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. @@ -104,10 +104,20 @@ add(VHostPath, ActingUser) -> {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), - ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath), - rabbit_event:notify(vhost_created, info(VHostPath) - ++ [{user_who_performed_action, ActingUser}]), - R. + case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of + ok -> + rabbit_event:notify(vhost_created, info(VHostPath) + ++ [{user_who_performed_action, ActingUser}]), + R; + {error, {no_such_vhost, VHostPath}} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!", + [VHostPath]), + {error, Msg}; + {error, Reason} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': ~p", + [VHostPath, Reason]), + {error, Msg} + end. delete(VHostPath, ActingUser) -> %% FIXME: We are forced to delete the queues and exchanges outside @@ -125,7 +135,10 @@ delete(VHostPath, ActingUser) -> with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}, {user_who_performed_action, ActingUser}]), - [ok = Fun() || Fun <- Funs], + [case Fun() of + ok -> ok; + {error, {no_such_vhost, VHostPath}} -> ok + end || Fun <- Funs], %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors %% on all the nodes. rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 482ad082b8..3c633875bc 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,17 +23,33 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - supervisor2:start_child(VHostSup, - {Type, {rabbit_msg_store, start_link, - [Type, VHostDir, ClientRefs, StartupFunState]}, - transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} = E -> + rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!", + [VHost]), + E + end. stop(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, Type), - ok = supervisor2:delete_child(VHostSup, Type). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type); + %% see start/4 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!", + [VHost]), + + ok + end. client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> with_vhost_store(VHost, Type, fun(StorePid) -> @@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) -> successfully_recovered_state(VHost, Type) -> with_vhost_store(VHost, Type, fun(StorePid) -> rabbit_msg_store:successfully_recovered_state(StorePid) - end).
\ No newline at end of file + end). diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index b8c7a649e5..bbf006fbd3 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -18,7 +18,8 @@ -include("rabbit.hrl"). -%% Supervisor is a per-vhost supervisor to contain queues and message stores +%% Each vhost gets an instance of this supervisor that supervises +%% message stores and queues (via rabbit_amqqueue_sup_sup). -behaviour(supervisor2). -export([init/1]). -export([start_link/1]). diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index e528d64e0e..7ecac7a5d4 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -25,7 +25,7 @@ -export([start_link/0, start/0]). -export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). +-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). %% Internal -export([stop_and_delete_vhost/1]). @@ -33,8 +33,10 @@ -record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). start() -> - case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []}, - permanent, infinity, supervisor, [?MODULE]}) of + case supervisor:start_child(rabbit_sup, {?MODULE, + {?MODULE, start_link, []}, + permanent, infinity, supervisor, + [?MODULE]}) of {ok, _} -> ok; {error, Err} -> {error, Err} end. @@ -43,17 +45,14 @@ start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of - ignore -> transient; - stop_node -> permanent; - transient -> transient; - permanent -> permanent - end, - + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + RestartStrategy = vhost_restart_strategy(), ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, - VhostRestart, infinity, supervisor, + RestartStrategy, ?SUPERVISOR_WAIT, supervisor, [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> @@ -83,7 +82,10 @@ start_vhost(VHost) -> case supervisor2:start_child(?MODULE, [VHost]) of {ok, _} -> ok; {error, {already_started, _}} -> ok; - Error -> throw(Error) + Error -> + rabbit_log:error("Could not start process tree " + "for vhost '~s': ~p", [VHost, Error]), + throw(Error) end, {ok, _} = vhost_sup_pid(VHost); {ok, Pid} when is_pid(Pid) -> @@ -100,7 +102,7 @@ stop_and_delete_vhost(VHost) -> false -> ok; true -> rabbit_log:info("Stopping vhost supervisor ~p" - " for vhost ~p~n", + " for vhost '~s'~n", [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> @@ -169,3 +171,12 @@ vhost_sup_pid(VHost) -> end end. +vhost_restart_strategy() -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + case application:get_env(rabbit, vhost_restart_strategy, continue) of + continue -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent + end. diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl index 3ce726621f..be2c5f20bb 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_sup_watcher.erl @@ -49,10 +49,17 @@ handle_info(check_vhost, VHost) -> case rabbit_vhost:exists(VHost) of true -> {noreply, VHost}; false -> - rabbit_log:error(" Vhost \"~p\" is gone." - " Stopping message store supervisor.", - [VHost]), - {stop, normal, VHost} + rabbit_log:warning("Virtual host '~s' is gone. " + "Stopping its top level supervisor.", + [VHost]), + %% Stop vhost's top supervisor in a one-off process to avoid a deadlock: + %% us (a child process) waiting for supervisor shutdown and our supervisor(s) + %% waiting for us to shutdown. + spawn( + fun() -> + rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost) + end), + {noreply, VHost} end; handle_info(_, VHost) -> {noreply, VHost}. @@ -63,4 +70,4 @@ code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. interval() -> - application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 3396f71fa9..8dbec30bff 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -14,6 +14,9 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart semantics. + -module(rabbit_vhost_sup_wrapper). -include("rabbit.hrl"). @@ -26,15 +29,9 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). -%% This module is a wrapper around vhost supervisor to -%% provide exactly once restart. - -%% rabbit_vhost_sup supervisor children are added dynamically, -%% so one_for_all strategy cannot be used. - init([VHost]) -> - %% Two restarts in 1 hour. One per message store. - {ok, {{one_for_all, 2, 3600000}, + %% 2 restarts in 5 minutes. One per message store. + {ok, {{one_for_all, 2, 300}, [{rabbit_vhost_sup, {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, permanent, infinity, supervisor, |
