diff options
| -rw-r--r-- | Makefile | 11 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_watcher.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 16 |
11 files changed, 183 insertions, 79 deletions
@@ -112,17 +112,20 @@ define PROJECT_ENV {passphrase, undefined} ]}, - %% rabbitmq-server-973 + %% rabbitmq-server#973 {queue_explicit_gc_run_operation_threshold, 1000}, {lazy_queue_explicit_gc_run_operation_threshold, 1000}, {background_gc_enabled, false}, {background_gc_target_interval, 60000}, - %% rabbitmq-server-589 + %% rabbitmq-server#589 {proxy_protocol, false}, {disk_monitor_failure_retries, 10}, {disk_monitor_failure_retry_interval, 120000}, - %% either "stop_node" or "ignore" - {vhost_restart_strategy, stop_node} + %% either "stop_node" or "continue". + %% by default we choose to not restart the entire node if one + %% vhost had to shut down e.g. because of a message store exception + %% see server#1158 and server#1280 + {vhost_restart_strategy, continue} ] endef diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index c503548187..c0067e008c 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -976,11 +976,11 @@ end}. {mapping, "proxy_protocol", "rabbit.proxy_protocol", [{datatype, {enum, [true, false]}}]}. -%% Whether to stop the rabbit application if VHost data -%% cannot be recovered. +%% Whether to stop the rabbit application if a vhost message +%% store or recovery fails (or another unrecoverable issue is detected). {mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy", - [{datatype, {enum, [stop_node, ignore]}}]}. + [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}. % ========================== % Lager section diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 3fb76be5e8..03dac03b7e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,8 +234,14 @@ recover(VHost) -> %% for further processing in recover_durable_queues. {ok, OrderedRecoveryTerms} = BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]), - {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost), - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)). + case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of + {ok, _} -> + recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + %% a failure + _ -> + rabbit_log:error("Failed to start queue supervisors for vhost '~s', skipping queue recovery", [VHost]), + [] + end. stop(VHost) -> ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 347dbbb48a..01cad21604 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -63,17 +63,32 @@ find_for_vhost(VHost, Node) -> Result -> {error, {queue_supervisor_not_found, Result}} end. --spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. +-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | ok | {error, term()}. start_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - supervisor2:start_child( - VHostSup, - {rabbit_amqqueue_sup_sup, - {rabbit_amqqueue_sup_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}); + %% we can get here if a vhost is created and immediately + %% deleted, e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + ok + end. -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), - ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index be9b1b6227..9b195c2cc2 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,20 +48,35 @@ %%---------------------------------------------------------------------------- start(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - {ok, _} = supervisor2:start_child( - VHostSup, - {?MODULE, - {?MODULE, start_link, [VHost]}, - transient, ?WORKER_WAIT, worker, - [?MODULE]}), + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}); + %% we can get here if a vhost is created and immediately + %% deleted, e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]) + end, ok. stop(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - case supervisor:terminate_child(VHostSup, ?MODULE) of - ok -> supervisor:delete_child(VHostSup, ?MODULE); - E -> E + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end; + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]), + + ok end. store(VHost, DirBaseName, Terms) -> @@ -74,7 +89,14 @@ read(VHost, DirBaseName) -> end. clear(VHost) -> - ok = dets:delete_all_objects(VHost), + try + dets:delete_all_objects(VHost) + %% see start/1 + catch _:badarg -> + rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!", + [VHost]), + ok + end, flush(VHost). start_link(VHost) -> @@ -126,8 +148,15 @@ open_global_table() -> ok. close_global_table() -> - ok = dets:sync(?MODULE), - ok = dets:close(?MODULE). + try + dets:sync(?MODULE), + dets:close(?MODULE) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to clear global recovery terms: table no longer exists!", + []), + ok + end. read_global(DirBaseName) -> read(?MODULE, DirBaseName). @@ -163,8 +192,23 @@ open_table(VHost) -> {ram_file, true}, {auto_save, infinity}]). -flush(VHost) -> ok = dets:sync(VHost). +flush(VHost) -> + try + dets:sync(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. close_table(VHost) -> - ok = flush(VHost), - ok = dets:close(VHost). + try + ok = flush(VHost), + ok = dets:close(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 40967e316e..7c631ddc66 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -496,15 +496,24 @@ stop(VHost) -> start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + safely_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), + safely_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), + ok. + +safely_start_msg_store(VHost, Type, Refs, StartFunState) -> + case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of + {ok, _} -> + rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]); + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n", + [Type, VHost]); + {error, Error} -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n", + [Type, VHost, Error]) + end. + +abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; +abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. stop_msg_store(VHost) -> rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 482ad082b8..2e54a5f360 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,17 +23,33 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - supervisor2:start_child(VHostSup, - {Type, {rabbit_msg_store, start_link, - [Type, VHostDir, ClientRefs, StartupFunState]}, - transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}); + %% we can get here if a vhost is created and immediately + %% deleted, e.g. some integration tests do it + {error, {no_such_vhost, VHost}} = E -> + rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!", + [VHost]), + E + end. stop(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, Type), - ok = supervisor2:delete_child(VHostSup, Type). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type); + %% see start/4 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!", + [VHost]), + + ok + end. client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> with_vhost_store(VHost, Type, fun(StorePid) -> @@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) -> successfully_recovered_state(VHost, Type) -> with_vhost_store(VHost, Type, fun(StorePid) -> rabbit_msg_store:successfully_recovered_state(StorePid) - end).
\ No newline at end of file + end). diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index b8c7a649e5..bbf006fbd3 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -18,7 +18,8 @@ -include("rabbit.hrl"). -%% Supervisor is a per-vhost supervisor to contain queues and message stores +%% Each vhost gets an instance of this supervisor that supervises +%% message stores and queues (via rabbit_amqqueue_sup_sup). -behaviour(supervisor2). -export([init/1]). -export([start_link/1]). diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index e528d64e0e..1ea6204c69 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -25,7 +25,7 @@ -export([start_link/0, start/0]). -export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). +-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). %% Internal -export([stop_and_delete_vhost/1]). @@ -33,8 +33,10 @@ -record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). start() -> - case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []}, - permanent, infinity, supervisor, [?MODULE]}) of + case supervisor:start_child(rabbit_sup, {?MODULE, + {?MODULE, start_link, []}, + permanent, infinity, supervisor, + [?MODULE]}) of {ok, _} -> ok; {error, Err} -> {error, Err} end. @@ -43,17 +45,14 @@ start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of - ignore -> transient; - stop_node -> permanent; - transient -> transient; - permanent -> permanent - end, - + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + RestartStrategy = vhost_restart_strategy(), ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, - VhostRestart, infinity, supervisor, + RestartStrategy, ?SUPERVISOR_WAIT, supervisor, [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> @@ -100,7 +99,7 @@ stop_and_delete_vhost(VHost) -> false -> ok; true -> rabbit_log:info("Stopping vhost supervisor ~p" - " for vhost ~p~n", + " for vhost '~s'~n", [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> @@ -169,3 +168,14 @@ vhost_sup_pid(VHost) -> end end. +vhost_restart_strategy() -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + case application:get_env(rabbit, vhost_restart_strategy, continue) of + continue -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent; + %% old name of continue + ignore -> transient + end. diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl index 3ce726621f..64497f8d4c 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_sup_watcher.erl @@ -49,8 +49,8 @@ handle_info(check_vhost, VHost) -> case rabbit_vhost:exists(VHost) of true -> {noreply, VHost}; false -> - rabbit_log:error(" Vhost \"~p\" is gone." - " Stopping message store supervisor.", + rabbit_log:error("Virtual host '~s' is gone." + "Stopping message store supervisor.", [VHost]), {stop, normal, VHost} end; @@ -63,4 +63,4 @@ code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. interval() -> - application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 3396f71fa9..ab2dc60b6f 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -14,6 +14,12 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart semantics. +%% +%% rabbit_vhost_sup supervisor children are added dynamically, +%% and the one_for_all strategy would be questionable here + -module(rabbit_vhost_sup_wrapper). -include("rabbit.hrl"). @@ -26,15 +32,9 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). -%% This module is a wrapper around vhost supervisor to -%% provide exactly once restart. - -%% rabbit_vhost_sup supervisor children are added dynamically, -%% so one_for_all strategy cannot be used. - init([VHost]) -> - %% Two restarts in 1 hour. One per message store. - {ok, {{one_for_all, 2, 3600000}, + %% 2 restarts in 5 minutes. One per message store. + {ok, {{one_for_all, 2, 5000}, [{rabbit_vhost_sup, {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, permanent, infinity, supervisor, |
