diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2017-07-07 16:08:09 -0700 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2017-07-07 16:08:09 -0700 |
| commit | caac1a65deb143a8750821174bfbb8a66a45d7c7 (patch) | |
| tree | 871bd7919bdde79df258b2589861c30c273f2e55 | |
| parent | b5e0206c7ba8cac637088e30427cfaecd520ca35 (diff) | |
| parent | 99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4 (diff) | |
| download | rabbitmq-server-git-caac1a65deb143a8750821174bfbb8a66a45d7c7.tar.gz | |
Merge branch 'master' into rabbitmq-management-421
| -rw-r--r-- | Makefile | 10 | ||||
| -rw-r--r-- | docs/rabbitmqctl.8 | 33 | ||||
| -rw-r--r-- | priv/schema/rabbit.schema | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_sup_sup.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 150 | ||||
| -rw-r--r-- | src/rabbit_parameter_validation.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_recovery_terms.erl | 78 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 27 | ||||
| -rw-r--r-- | src/rabbit_vhost_msg_store.erl | 36 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 37 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_watcher.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 13 | ||||
| -rw-r--r-- | test/sync_detection_SUITE.erl | 2 |
17 files changed, 231 insertions, 258 deletions
@@ -112,17 +112,19 @@ define PROJECT_ENV {passphrase, undefined} ]}, - %% rabbitmq-server-973 + %% rabbitmq-server#973 {queue_explicit_gc_run_operation_threshold, 1000}, {lazy_queue_explicit_gc_run_operation_threshold, 1000}, {background_gc_enabled, false}, {background_gc_target_interval, 60000}, - %% rabbitmq-server-589 + %% rabbitmq-server#589 {proxy_protocol, false}, {disk_monitor_failure_retries, 10}, {disk_monitor_failure_retry_interval, 120000}, - %% either "stop_node" or "ignore" - {vhost_restart_strategy, stop_node} + %% either "stop_node" or "continue". + %% by default we choose to not terminate the entire node if one + %% vhost had to shut down, see server#1158 and server#1280 + {vhost_restart_strategy, continue} ] endef diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index e9bb7fd401..7a8497a89b 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -1811,34 +1811,14 @@ floating point number. Values lower than 1.0 can be dangerous and should be used carefully. .El .\" ------------------------------------ -.It Cm encode Oo Fl -decode Oc Oo Ar value Oc Oo Ar passphrase Oc Oo Fl -list-ciphers Oc Oo Fl -list-hashes Oc Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations +.It Cm encode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations .Bl -tag -width Ds -.It Fl -decode -Flag to decrypt the input value. -.Pp -For example: -.sp -.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase .It Ar value Ar passphrase -Value to encrypt/decrypt and passphrase. +Value to encrypt and passphrase. .Pp For example: .sp .Dl rabbitmqctl encode '<<"guest">>' mypassphrase -.sp -.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase -.It Fl -list-ciphers -Flag to list the supported ciphers. -.Pp -For example: -.sp -.Dl rabbitmqctl encode --list-ciphers -.It Fl -list-hashes -Flag to list the supported hash algorithms. -.Pp -For example: -.sp -.Dl rabbitmqctl encode --list-hashes .It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations Options to specify the encryption settings. They can be used independently. @@ -1848,7 +1828,7 @@ For example: .Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase .El .\" ------------------------------------ -.It Cm decode Ar value Ar passphrase +.It Cm decode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations .Bl -tag -width Ds .It Ar value Ar passphrase Value to decrypt (as produced by the encode command) and passphrase. @@ -1856,6 +1836,13 @@ Value to decrypt (as produced by the encode command) and passphrase. For example: .sp .Dl rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase +.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations +Options to specify the decryption settings. +They can be used independently. +.Pp +For example: +.sp +.Dl rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase .El .\" ------------------------------------ .It Cm list_hashes diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index c503548187..70351f116c 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -976,11 +976,11 @@ end}. {mapping, "proxy_protocol", "rabbit.proxy_protocol", [{datatype, {enum, [true, false]}}]}. -%% Whether to stop the rabbit application if VHost data -%% cannot be recovered. +%% Whether to stop the rabbit application if a vhost has +%% to terminate for any reason. {mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy", - [{datatype, {enum, [stop_node, ignore]}}]}. + [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}. % ========================== % Lager section diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 4eead35c1d..81eb5edf6e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -234,8 +234,13 @@ recover(VHost) -> %% for further processing in recover_durable_queues. {ok, OrderedRecoveryTerms} = BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]), - {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost), - recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)). + case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of + {ok, _} -> + recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)); + {error, Reason} -> + rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]), + throw({error, Reason}) + end. stop(VHost) -> ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl index 347dbbb48a..b5ef86255d 100644 --- a/src/rabbit_amqqueue_sup_sup.erl +++ b/src/rabbit_amqqueue_sup_sup.erl @@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) -> -spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}. start_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - supervisor2:start_child( - VHostSup, - {rabbit_amqqueue_sup_sup, - {rabbit_amqqueue_sup_sup, start_link, []}, - transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + supervisor2:start_child( + VHostSup, + {rabbit_amqqueue_sup_sup, + {rabbit_amqqueue_sup_sup, start_link, []}, + transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + {error, {no_such_vhost, VHost}} + end. -spec stop_for_vhost(rabbit_types:vhost()) -> ok. stop_for_vhost(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), - ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup).
\ No newline at end of file + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup), + ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup); + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index 850ec48bdc..6b5ab499cc 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -24,10 +24,6 @@ -include_lib("kernel/include/inet.hrl"). --define(EPMD_TIMEOUT, 30000). --define(TCP_DIAGNOSTIC_TIMEOUT, 5000). --define(ERROR_LOGGER_HANDLER, rabbit_error_logger_handler). - %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -45,135 +41,10 @@ %%---------------------------------------------------------------------------- names(Hostname) -> - Self = self(), - Ref = make_ref(), - {Pid, MRef} = spawn_monitor( - fun () -> Self ! {Ref, net_adm:names(Hostname)} end), - timer:exit_after(?EPMD_TIMEOUT, Pid, timeout), - receive - {Ref, Names} -> erlang:demonitor(MRef, [flush]), - Names; - {'DOWN', MRef, process, Pid, Reason} -> {error, Reason} - end. + rabbit_nodes_common:names(Hostname). diagnostics(Nodes) -> - verbose_erlang_distribution(true), - NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n" - "attempted to contact: ~p~n", [Nodes]}] ++ - [diagnostics_node(Node) || Node <- Nodes] ++ - current_node_details(), - verbose_erlang_distribution(false), - rabbit_misc:format_many(lists:flatten(NodeDiags)). - -verbose_erlang_distribution(true) -> - net_kernel:verbose(1), - error_logger:add_report_handler(?ERROR_LOGGER_HANDLER); -verbose_erlang_distribution(false) -> - net_kernel:verbose(0), - error_logger:delete_report_handler(?ERROR_LOGGER_HANDLER). - -current_node_details() -> - [{"~ncurrent node details:~n- node name: ~w", [node()]}, - case init:get_argument(home) of - {ok, [[Home]]} -> {"- home dir: ~s", [Home]}; - Other -> {"- no home dir: ~p", [Other]} - end, - {"- cookie hash: ~s", [cookie_hash()]}]. - -diagnostics_node(Node) -> - {Name, Host} = parts(Node), - [{"~s:", [Node]} | - case names(Host) of - {error, Reason} -> - [{" * unable to connect to epmd (port ~s) on ~s: ~s~n", - [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}]; - {ok, NamePorts} -> - [{" * connected to epmd (port ~s) on ~s", - [epmd_port(), Host]}] ++ - case net_adm:ping(Node) of - pong -> dist_working_diagnostics(Node); - pang -> dist_broken_diagnostics(Name, Host, NamePorts) - end - end]. - -epmd_port() -> - case init:get_argument(epmd_port) of - {ok, [[Port | _] | _]} when is_list(Port) -> Port; - error -> "4369" - end. - -dist_working_diagnostics(Node) -> - case is_process_running(Node, rabbit) of - true -> [{" * node ~s up, 'rabbit' application running", [Node]}]; - false -> [{" * node ~s up, 'rabbit' application not running~n" - " * running applications on ~s: ~p~n" - " * suggestion: start_app on ~s", - [Node, Node, remote_apps(Node), Node]}] - end. - -remote_apps(Node) -> - %% We want a timeout here because really, we don't trust the node, - %% the last thing we want to do is hang. - case rpc:call(Node, application, which_applications, [5000]) of - {badrpc, _} = E -> E; - Apps -> [App || {App, _, _} <- Apps] - end. - -dist_broken_diagnostics(Name, Host, NamePorts) -> - case [{N, P} || {N, P} <- NamePorts, N =:= Name] of - [] -> - {SelfName, SelfHost} = parts(node()), - Others = [list_to_atom(N) || {N, _} <- NamePorts, - N =/= case SelfHost of - Host -> SelfName; - _ -> never_matches - end], - OthersDiag = case Others of - [] -> [{" no other nodes on ~s", - [Host]}]; - _ -> [{" other nodes on ~s: ~p", - [Host, Others]}] - end, - [{" * epmd reports: node '~s' not running at all", [Name]}, - OthersDiag, {" * suggestion: start the node", []}]; - [{Name, Port}] -> - [{" * epmd reports node '~s' running on port ~b", [Name, Port]} | - case diagnose_connect(Host, Port) of - ok -> - connection_succeeded_diagnostics(); - {error, Reason} -> - [{" * can't establish TCP connection, reason: ~s~n" - " * suggestion: blocked by firewall?", - [rabbit_misc:format_inet_error(Reason)]}] - end] - end. - -connection_succeeded_diagnostics() -> - case gen_event:call(error_logger, ?ERROR_LOGGER_HANDLER, get_connection_report) of - [] -> - [{" * TCP connection succeeded but Erlang distribution " - "failed~n" - " * suggestion: hostname mismatch?~n" - " * suggestion: is the cookie set correctly?~n" - " * suggestion: is the Erlang distribution using TLS?", []}]; - Report -> - [{" * TCP connection succeeded but Erlang distribution " - "failed~n", []}] - ++ Report - end. - -diagnose_connect(Host, Port) -> - case inet:gethostbyname(Host) of - {ok, #hostent{h_addrtype = Family}} -> - case gen_tcp:connect(Host, Port, [Family], - ?TCP_DIAGNOSTIC_TIMEOUT) of - {ok, Socket} -> gen_tcp:close(Socket), - ok; - {error, _} = E -> E - end; - {error, _} = E -> - E - end. + rabbit_nodes_common:diagnostics(Nodes). make(NodeStr) -> rabbit_nodes_common:make(NodeStr). @@ -182,30 +53,23 @@ parts(NodeStr) -> rabbit_nodes_common:parts(NodeStr). cookie_hash() -> - base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))). + rabbit_nodes_common:cookie_hash(). is_running(Node, Application) -> - case rpc:call(Node, rabbit_misc, which_applications, []) of - {badrpc, _} -> false; - Apps -> proplists:is_defined(Application, Apps) - end. + rabbit_nodes_common:is_running(Node, Application). is_process_running(Node, Process) -> - case rpc:call(Node, erlang, whereis, [Process]) of - {badrpc, _} -> false; - undefined -> false; - P when is_pid(P) -> true - end. + rabbit_nodes_common:is_process_running(Node, Process). cluster_name() -> rabbit_runtime_parameters:value_global( cluster_name, cluster_name_default()). cluster_name_default() -> - {ID, _} = rabbit_nodes:parts(node()), + {ID, _} = parts(node()), {ok, Host} = inet:gethostname(), {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), - list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). + list_to_binary(atom_to_list(make({ID, FQDN}))). set_cluster_name(Name, Username) -> %% Cluster name should be binary diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index d93104d02c..f28e1281c3 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -22,13 +22,13 @@ number(_Name, Term) when is_number(Term) -> ok; number(Name, Term) -> - {error, "~s should be number, actually was ~p", [Name, Term]}. + {error, "~s should be a number, actually was ~p", [Name, Term]}. integer(_Name, Term) when is_integer(Term) -> ok; integer(Name, Term) -> - {error, "~s should be number, actually was ~p", [Name, Term]}. + {error, "~s should be a number, actually was ~p", [Name, Term]}. binary(_Name, Term) when is_binary(Term) -> ok; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7416ede4b8..e9776f47ad 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -565,7 +565,7 @@ queue_name_to_dir_name(#resource { kind = queue, rabbit_misc:format("~.36B", [Num]). queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) -> - <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)), + <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)), rabbit_misc:format("~.36B", [Num]). queues_base_dir() -> diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl index be9b1b6227..73fc9c7449 100644 --- a/src/rabbit_recovery_terms.erl +++ b/src/rabbit_recovery_terms.erl @@ -48,20 +48,35 @@ %%---------------------------------------------------------------------------- start(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - {ok, _} = supervisor2:start_child( - VHostSup, - {?MODULE, - {?MODULE, start_link, [VHost]}, - transient, ?WORKER_WAIT, worker, - [?MODULE]}), + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + {ok, _} = supervisor2:start_child( + VHostSup, + {?MODULE, + {?MODULE, start_link, [VHost]}, + transient, ?WORKER_WAIT, worker, + [?MODULE]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]) + end, ok. stop(VHost) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - case supervisor:terminate_child(VHostSup, ?MODULE) of - ok -> supervisor:delete_child(VHostSup, ?MODULE); - E -> E + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + case supervisor:terminate_child(VHostSup, ?MODULE) of + ok -> supervisor:delete_child(VHostSup, ?MODULE); + E -> E + end; + %% see start/1 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!", + [VHost]), + + ok end. store(VHost, DirBaseName, Terms) -> @@ -74,7 +89,14 @@ read(VHost, DirBaseName) -> end. clear(VHost) -> - ok = dets:delete_all_objects(VHost), + try + dets:delete_all_objects(VHost) + %% see start/1 + catch _:badarg -> + rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!", + [VHost]), + ok + end, flush(VHost). start_link(VHost) -> @@ -126,8 +148,15 @@ open_global_table() -> ok. close_global_table() -> - ok = dets:sync(?MODULE), - ok = dets:close(?MODULE). + try + dets:sync(?MODULE), + dets:close(?MODULE) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to clear global recovery terms: table no longer exists!", + []), + ok + end. read_global(DirBaseName) -> read(?MODULE, DirBaseName). @@ -163,8 +192,23 @@ open_table(VHost) -> {ram_file, true}, {auto_save, infinity}]). -flush(VHost) -> ok = dets:sync(VHost). +flush(VHost) -> + try + dets:sync(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. close_table(VHost) -> - ok = flush(VHost), - ok = dets:close(VHost). + try + ok = flush(VHost), + ok = dets:close(VHost) + %% see clear/1 + catch _:badarg -> + rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!", + [VHost]), + ok + end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 40967e316e..dd08916e35 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -496,15 +496,26 @@ stop(VHost) -> start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined -> rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?TRANSIENT_MSG_STORE, - undefined, - ?EMPTY_START_FUN_STATE), - {ok, _} = rabbit_vhost_msg_store:start(VHost, - ?PERSISTENT_MSG_STORE, - Refs, - StartFunState), - rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]). + do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE), + do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState), + ok. + +do_start_msg_store(VHost, Type, Refs, StartFunState) -> + case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of + {ok, _} -> + rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]); + {error, {no_such_vhost, VHost}} = Err -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n", + [Type, VHost]), + exit(Err); + {error, Error} -> + rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n", + [Type, VHost, Error]), + exit({error, Error}) + end. + +abbreviated_type(?TRANSIENT_MSG_STORE) -> transient; +abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent. stop_msg_store(VHost) -> rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE), diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 4dc2ec86d0..7513c23925 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -27,8 +27,8 @@ -export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]). -export([delete_storage/1]). --spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. --spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'. +-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). +-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()). -spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A. -spec exists(rabbit_types:vhost()) -> boolean(). -spec list() -> [rabbit_types:vhost()]. @@ -104,10 +104,20 @@ add(VHostPath, ActingUser) -> {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), - ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath), - rabbit_event:notify(vhost_created, info(VHostPath) - ++ [{user_who_performed_action, ActingUser}]), - R. + case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of + ok -> + rabbit_event:notify(vhost_created, info(VHostPath) + ++ [{user_who_performed_action, ActingUser}]), + R; + {error, {no_such_vhost, VHostPath}} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!", + [VHostPath]), + {error, Msg}; + {error, Reason} -> + Msg = rabbit_misc:format("failed to set up vhost '~s': ~p", + [VHostPath, Reason]), + {error, Msg} + end. delete(VHostPath, ActingUser) -> %% FIXME: We are forced to delete the queues and exchanges outside @@ -125,7 +135,10 @@ delete(VHostPath, ActingUser) -> with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)), ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}, {user_who_performed_action, ActingUser}]), - [ok = Fun() || Fun <- Funs], + [case Fun() of + ok -> ok; + {error, {no_such_vhost, VHostPath}} -> ok + end || Fun <- Funs], %% After vhost was deleted from mnesia DB, we try to stop vhost supervisors %% on all the nodes. rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath), diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl index 482ad082b8..3c633875bc 100644 --- a/src/rabbit_vhost_msg_store.erl +++ b/src/rabbit_vhost_msg_store.erl @@ -23,17 +23,33 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs); ClientRefs == undefined -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - VHostDir = rabbit_vhost:msg_store_dir_path(VHost), - supervisor2:start_child(VHostSup, - {Type, {rabbit_msg_store, start_link, - [Type, VHostDir, ClientRefs, StartupFunState]}, - transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + VHostDir = rabbit_vhost:msg_store_dir_path(VHost), + supervisor2:start_child(VHostSup, + {Type, {rabbit_msg_store, start_link, + [Type, VHostDir, ClientRefs, StartupFunState]}, + transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}); + %% we can get here if a vhost is added and removed concurrently + %% e.g. some integration tests do it + {error, {no_such_vhost, VHost}} = E -> + rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!", + [VHost]), + E + end. stop(VHost, Type) -> - {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost), - ok = supervisor2:terminate_child(VHostSup, Type), - ok = supervisor2:delete_child(VHostSup, Type). + case rabbit_vhost_sup_sup:vhost_sup(VHost) of + {ok, VHostSup} -> + ok = supervisor2:terminate_child(VHostSup, Type), + ok = supervisor2:delete_child(VHostSup, Type); + %% see start/4 + {error, {no_such_vhost, VHost}} -> + rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!", + [VHost]), + + ok + end. client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) -> with_vhost_store(VHost, Type, fun(StorePid) -> @@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) -> successfully_recovered_state(VHost, Type) -> with_vhost_store(VHost, Type, fun(StorePid) -> rabbit_msg_store:successfully_recovered_state(StorePid) - end).
\ No newline at end of file + end). diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl index b8c7a649e5..bbf006fbd3 100644 --- a/src/rabbit_vhost_sup.erl +++ b/src/rabbit_vhost_sup.erl @@ -18,7 +18,8 @@ -include("rabbit.hrl"). -%% Supervisor is a per-vhost supervisor to contain queues and message stores +%% Each vhost gets an instance of this supervisor that supervises +%% message stores and queues (via rabbit_amqqueue_sup_sup). -behaviour(supervisor2). -export([init/1]). -export([start_link/1]). diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index e528d64e0e..7ecac7a5d4 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -25,7 +25,7 @@ -export([start_link/0, start/0]). -export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). -export([delete_on_all_nodes/1]). --export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). +-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]). %% Internal -export([stop_and_delete_vhost/1]). @@ -33,8 +33,10 @@ -record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). start() -> - case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []}, - permanent, infinity, supervisor, [?MODULE]}) of + case supervisor:start_child(rabbit_sup, {?MODULE, + {?MODULE, start_link, []}, + permanent, infinity, supervisor, + [?MODULE]}) of {ok, _} -> ok; {error, Err} -> {error, Err} end. @@ -43,17 +45,14 @@ start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of - ignore -> transient; - stop_node -> permanent; - transient -> transient; - permanent -> permanent - end, - + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + RestartStrategy = vhost_restart_strategy(), ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, - VhostRestart, infinity, supervisor, + RestartStrategy, ?SUPERVISOR_WAIT, supervisor, [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. start_on_all_nodes(VHost) -> @@ -83,7 +82,10 @@ start_vhost(VHost) -> case supervisor2:start_child(?MODULE, [VHost]) of {ok, _} -> ok; {error, {already_started, _}} -> ok; - Error -> throw(Error) + Error -> + rabbit_log:error("Could not start process tree " + "for vhost '~s': ~p", [VHost, Error]), + throw(Error) end, {ok, _} = vhost_sup_pid(VHost); {ok, Pid} when is_pid(Pid) -> @@ -100,7 +102,7 @@ stop_and_delete_vhost(VHost) -> false -> ok; true -> rabbit_log:info("Stopping vhost supervisor ~p" - " for vhost ~p~n", + " for vhost '~s'~n", [VHostSupPid, VHost]), case supervisor2:terminate_child(?MODULE, WrapperPid) of ok -> @@ -169,3 +171,12 @@ vhost_sup_pid(VHost) -> end end. +vhost_restart_strategy() -> + %% This assumes that a single vhost termination should not shut down nodes + %% unless the operator opts in. + case application:get_env(rabbit, vhost_restart_strategy, continue) of + continue -> transient; + stop_node -> permanent; + transient -> transient; + permanent -> permanent + end. diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl index 3ce726621f..be2c5f20bb 100644 --- a/src/rabbit_vhost_sup_watcher.erl +++ b/src/rabbit_vhost_sup_watcher.erl @@ -49,10 +49,17 @@ handle_info(check_vhost, VHost) -> case rabbit_vhost:exists(VHost) of true -> {noreply, VHost}; false -> - rabbit_log:error(" Vhost \"~p\" is gone." - " Stopping message store supervisor.", - [VHost]), - {stop, normal, VHost} + rabbit_log:warning("Virtual host '~s' is gone. " + "Stopping its top level supervisor.", + [VHost]), + %% Stop vhost's top supervisor in a one-off process to avoid a deadlock: + %% us (a child process) waiting for supervisor shutdown and our supervisor(s) + %% waiting for us to shutdown. + spawn( + fun() -> + rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost) + end), + {noreply, VHost} end; handle_info(_, VHost) -> {noreply, VHost}. @@ -63,4 +70,4 @@ code_change(_OldVsn, VHost, _Extra) -> {ok, VHost}. interval() -> - application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
\ No newline at end of file + application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl index 3396f71fa9..8dbec30bff 100644 --- a/src/rabbit_vhost_sup_wrapper.erl +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -14,6 +14,9 @@ %% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. %% +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart semantics. + -module(rabbit_vhost_sup_wrapper). -include("rabbit.hrl"). @@ -26,15 +29,9 @@ start_link(VHost) -> supervisor2:start_link(?MODULE, [VHost]). -%% This module is a wrapper around vhost supervisor to -%% provide exactly once restart. - -%% rabbit_vhost_sup supervisor children are added dynamically, -%% so one_for_all strategy cannot be used. - init([VHost]) -> - %% Two restarts in 1 hour. One per message store. - {ok, {{one_for_all, 2, 3600000}, + %% 2 restarts in 5 minutes. One per message store. + {ok, {{one_for_all, 2, 300}, [{rabbit_vhost_sup, {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, permanent, infinity, supervisor, diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl index f62cb846da..c606e050dd 100644 --- a/test/sync_detection_SUITE.erl +++ b/test/sync_detection_SUITE.erl @@ -213,7 +213,7 @@ slave_pids(Node, Queue) -> %% The mnesia synchronization takes a while, but we don't want to wait for the %% test to fail, since the timetrap is quite high. wait_for_sync_status(Status, Node, Queue) -> - Max = 10000 / ?LOOP_RECURSION_DELAY, + Max = 30000 / ?LOOP_RECURSION_DELAY, wait_for_sync_status(0, Max, Status, Node, Queue). wait_for_sync_status(N, Max, Status, Node, Queue) when N >= Max -> |
