summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-07-07 16:08:09 -0700
committerMichael Klishin <michael@clojurewerkz.org>2017-07-07 16:08:09 -0700
commitcaac1a65deb143a8750821174bfbb8a66a45d7c7 (patch)
tree871bd7919bdde79df258b2589861c30c273f2e55
parentb5e0206c7ba8cac637088e30427cfaecd520ca35 (diff)
parent99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4 (diff)
downloadrabbitmq-server-git-caac1a65deb143a8750821174bfbb8a66a45d7c7.tar.gz
Merge branch 'master' into rabbitmq-management-421
-rw-r--r--Makefile10
-rw-r--r--docs/rabbitmqctl.833
-rw-r--r--priv/schema/rabbit.schema6
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl33
-rw-r--r--src/rabbit_nodes.erl150
-rw-r--r--src/rabbit_parameter_validation.erl4
-rw-r--r--src/rabbit_queue_index.erl2
-rw-r--r--src/rabbit_recovery_terms.erl78
-rw-r--r--src/rabbit_variable_queue.erl29
-rw-r--r--src/rabbit_vhost.erl27
-rw-r--r--src/rabbit_vhost_msg_store.erl36
-rw-r--r--src/rabbit_vhost_sup.erl3
-rw-r--r--src/rabbit_vhost_sup_sup.erl37
-rw-r--r--src/rabbit_vhost_sup_watcher.erl17
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl13
-rw-r--r--test/sync_detection_SUITE.erl2
17 files changed, 231 insertions, 258 deletions
diff --git a/Makefile b/Makefile
index 61c8fa4b53..9aeaf80c72 100644
--- a/Makefile
+++ b/Makefile
@@ -112,17 +112,19 @@ define PROJECT_ENV
{passphrase, undefined}
]},
- %% rabbitmq-server-973
+ %% rabbitmq-server#973
{queue_explicit_gc_run_operation_threshold, 1000},
{lazy_queue_explicit_gc_run_operation_threshold, 1000},
{background_gc_enabled, false},
{background_gc_target_interval, 60000},
- %% rabbitmq-server-589
+ %% rabbitmq-server#589
{proxy_protocol, false},
{disk_monitor_failure_retries, 10},
{disk_monitor_failure_retry_interval, 120000},
- %% either "stop_node" or "ignore"
- {vhost_restart_strategy, stop_node}
+ %% either "stop_node" or "continue".
+ %% by default we choose to not terminate the entire node if one
+ %% vhost had to shut down, see server#1158 and server#1280
+ {vhost_restart_strategy, continue}
]
endef
diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8
index e9bb7fd401..7a8497a89b 100644
--- a/docs/rabbitmqctl.8
+++ b/docs/rabbitmqctl.8
@@ -1811,34 +1811,14 @@ floating point number.
Values lower than 1.0 can be dangerous and should be used carefully.
.El
.\" ------------------------------------
-.It Cm encode Oo Fl -decode Oc Oo Ar value Oc Oo Ar passphrase Oc Oo Fl -list-ciphers Oc Oo Fl -list-hashes Oc Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
+.It Cm encode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
.Bl -tag -width Ds
-.It Fl -decode
-Flag to decrypt the input value.
-.Pp
-For example:
-.sp
-.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase
.It Ar value Ar passphrase
-Value to encrypt/decrypt and passphrase.
+Value to encrypt and passphrase.
.Pp
For example:
.sp
.Dl rabbitmqctl encode '<<"guest">>' mypassphrase
-.sp
-.Dl rabbitmqctl encode --decode '{encrypted,<<"...">>}' mypassphrase
-.It Fl -list-ciphers
-Flag to list the supported ciphers.
-.Pp
-For example:
-.sp
-.Dl rabbitmqctl encode --list-ciphers
-.It Fl -list-hashes
-Flag to list the supported hash algorithms.
-.Pp
-For example:
-.sp
-.Dl rabbitmqctl encode --list-hashes
.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations
Options to specify the encryption settings.
They can be used independently.
@@ -1848,7 +1828,7 @@ For example:
.Dl rabbitmqctl encode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '<<"guest">>' mypassphrase
.El
.\" ------------------------------------
-.It Cm decode Ar value Ar passphrase
+.It Cm decode Ar value Ar passphrase Oo Fl -cipher Ar cipher Oc Oo Fl -hash Ar hash Oc Op Fl -iterations Ar iterations
.Bl -tag -width Ds
.It Ar value Ar passphrase
Value to decrypt (as produced by the encode command) and passphrase.
@@ -1856,6 +1836,13 @@ Value to decrypt (as produced by the encode command) and passphrase.
For example:
.sp
.Dl rabbitmqctl decode '{encrypted, <<"...">>}' mypassphrase
+.It Fl -cipher Ar cipher Fl -hash Ar hash Fl -iterations Ar iterations
+Options to specify the decryption settings.
+They can be used independently.
+.Pp
+For example:
+.sp
+.Dl rabbitmqctl decode --cipher blowfish_cfb64 --hash sha256 --iterations 10000 '{encrypted,<<"...">>} mypassphrase
.El
.\" ------------------------------------
.It Cm list_hashes
diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema
index c503548187..70351f116c 100644
--- a/priv/schema/rabbit.schema
+++ b/priv/schema/rabbit.schema
@@ -976,11 +976,11 @@ end}.
{mapping, "proxy_protocol", "rabbit.proxy_protocol",
[{datatype, {enum, [true, false]}}]}.
-%% Whether to stop the rabbit application if VHost data
-%% cannot be recovered.
+%% Whether to stop the rabbit application if a vhost has
+%% to terminate for any reason.
{mapping, "vhost_restart_strategy", "rabbit.vhost_restart_strategy",
- [{datatype, {enum, [stop_node, ignore]}}]}.
+ [{datatype, {enum, [stop_node, continue, transient, persistent]}}]}.
% ==========================
% Lager section
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4eead35c1d..81eb5edf6e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -234,8 +234,13 @@ recover(VHost) ->
%% for further processing in recover_durable_queues.
{ok, OrderedRecoveryTerms} =
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
- {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).
+ case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
+ {ok, _} ->
+ recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ {error, Reason} ->
+ rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
+ throw({error, Reason})
+ end.
stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 347dbbb48a..b5ef86255d 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- supervisor2:start_child(
- VHostSup,
- {rabbit_amqqueue_sup_sup,
- {rabbit_amqqueue_sup_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ supervisor2:start_child(
+ VHostSup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ {error, {no_such_vhost, VHost}}
+ end.
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
- ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup). \ No newline at end of file
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
+ ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index 850ec48bdc..6b5ab499cc 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -24,10 +24,6 @@
-include_lib("kernel/include/inet.hrl").
--define(EPMD_TIMEOUT, 30000).
--define(TCP_DIAGNOSTIC_TIMEOUT, 5000).
--define(ERROR_LOGGER_HANDLER, rabbit_error_logger_handler).
-
%%----------------------------------------------------------------------------
%% Specs
%%----------------------------------------------------------------------------
@@ -45,135 +41,10 @@
%%----------------------------------------------------------------------------
names(Hostname) ->
- Self = self(),
- Ref = make_ref(),
- {Pid, MRef} = spawn_monitor(
- fun () -> Self ! {Ref, net_adm:names(Hostname)} end),
- timer:exit_after(?EPMD_TIMEOUT, Pid, timeout),
- receive
- {Ref, Names} -> erlang:demonitor(MRef, [flush]),
- Names;
- {'DOWN', MRef, process, Pid, Reason} -> {error, Reason}
- end.
+ rabbit_nodes_common:names(Hostname).
diagnostics(Nodes) ->
- verbose_erlang_distribution(true),
- NodeDiags = [{"~nDIAGNOSTICS~n===========~n~n"
- "attempted to contact: ~p~n", [Nodes]}] ++
- [diagnostics_node(Node) || Node <- Nodes] ++
- current_node_details(),
- verbose_erlang_distribution(false),
- rabbit_misc:format_many(lists:flatten(NodeDiags)).
-
-verbose_erlang_distribution(true) ->
- net_kernel:verbose(1),
- error_logger:add_report_handler(?ERROR_LOGGER_HANDLER);
-verbose_erlang_distribution(false) ->
- net_kernel:verbose(0),
- error_logger:delete_report_handler(?ERROR_LOGGER_HANDLER).
-
-current_node_details() ->
- [{"~ncurrent node details:~n- node name: ~w", [node()]},
- case init:get_argument(home) of
- {ok, [[Home]]} -> {"- home dir: ~s", [Home]};
- Other -> {"- no home dir: ~p", [Other]}
- end,
- {"- cookie hash: ~s", [cookie_hash()]}].
-
-diagnostics_node(Node) ->
- {Name, Host} = parts(Node),
- [{"~s:", [Node]} |
- case names(Host) of
- {error, Reason} ->
- [{" * unable to connect to epmd (port ~s) on ~s: ~s~n",
- [epmd_port(), Host, rabbit_misc:format_inet_error(Reason)]}];
- {ok, NamePorts} ->
- [{" * connected to epmd (port ~s) on ~s",
- [epmd_port(), Host]}] ++
- case net_adm:ping(Node) of
- pong -> dist_working_diagnostics(Node);
- pang -> dist_broken_diagnostics(Name, Host, NamePorts)
- end
- end].
-
-epmd_port() ->
- case init:get_argument(epmd_port) of
- {ok, [[Port | _] | _]} when is_list(Port) -> Port;
- error -> "4369"
- end.
-
-dist_working_diagnostics(Node) ->
- case is_process_running(Node, rabbit) of
- true -> [{" * node ~s up, 'rabbit' application running", [Node]}];
- false -> [{" * node ~s up, 'rabbit' application not running~n"
- " * running applications on ~s: ~p~n"
- " * suggestion: start_app on ~s",
- [Node, Node, remote_apps(Node), Node]}]
- end.
-
-remote_apps(Node) ->
- %% We want a timeout here because really, we don't trust the node,
- %% the last thing we want to do is hang.
- case rpc:call(Node, application, which_applications, [5000]) of
- {badrpc, _} = E -> E;
- Apps -> [App || {App, _, _} <- Apps]
- end.
-
-dist_broken_diagnostics(Name, Host, NamePorts) ->
- case [{N, P} || {N, P} <- NamePorts, N =:= Name] of
- [] ->
- {SelfName, SelfHost} = parts(node()),
- Others = [list_to_atom(N) || {N, _} <- NamePorts,
- N =/= case SelfHost of
- Host -> SelfName;
- _ -> never_matches
- end],
- OthersDiag = case Others of
- [] -> [{" no other nodes on ~s",
- [Host]}];
- _ -> [{" other nodes on ~s: ~p",
- [Host, Others]}]
- end,
- [{" * epmd reports: node '~s' not running at all", [Name]},
- OthersDiag, {" * suggestion: start the node", []}];
- [{Name, Port}] ->
- [{" * epmd reports node '~s' running on port ~b", [Name, Port]} |
- case diagnose_connect(Host, Port) of
- ok ->
- connection_succeeded_diagnostics();
- {error, Reason} ->
- [{" * can't establish TCP connection, reason: ~s~n"
- " * suggestion: blocked by firewall?",
- [rabbit_misc:format_inet_error(Reason)]}]
- end]
- end.
-
-connection_succeeded_diagnostics() ->
- case gen_event:call(error_logger, ?ERROR_LOGGER_HANDLER, get_connection_report) of
- [] ->
- [{" * TCP connection succeeded but Erlang distribution "
- "failed~n"
- " * suggestion: hostname mismatch?~n"
- " * suggestion: is the cookie set correctly?~n"
- " * suggestion: is the Erlang distribution using TLS?", []}];
- Report ->
- [{" * TCP connection succeeded but Erlang distribution "
- "failed~n", []}]
- ++ Report
- end.
-
-diagnose_connect(Host, Port) ->
- case inet:gethostbyname(Host) of
- {ok, #hostent{h_addrtype = Family}} ->
- case gen_tcp:connect(Host, Port, [Family],
- ?TCP_DIAGNOSTIC_TIMEOUT) of
- {ok, Socket} -> gen_tcp:close(Socket),
- ok;
- {error, _} = E -> E
- end;
- {error, _} = E ->
- E
- end.
+ rabbit_nodes_common:diagnostics(Nodes).
make(NodeStr) ->
rabbit_nodes_common:make(NodeStr).
@@ -182,30 +53,23 @@ parts(NodeStr) ->
rabbit_nodes_common:parts(NodeStr).
cookie_hash() ->
- base64:encode_to_string(erlang:md5(atom_to_list(erlang:get_cookie()))).
+ rabbit_nodes_common:cookie_hash().
is_running(Node, Application) ->
- case rpc:call(Node, rabbit_misc, which_applications, []) of
- {badrpc, _} -> false;
- Apps -> proplists:is_defined(Application, Apps)
- end.
+ rabbit_nodes_common:is_running(Node, Application).
is_process_running(Node, Process) ->
- case rpc:call(Node, erlang, whereis, [Process]) of
- {badrpc, _} -> false;
- undefined -> false;
- P when is_pid(P) -> true
- end.
+ rabbit_nodes_common:is_process_running(Node, Process).
cluster_name() ->
rabbit_runtime_parameters:value_global(
cluster_name, cluster_name_default()).
cluster_name_default() ->
- {ID, _} = rabbit_nodes:parts(node()),
+ {ID, _} = parts(node()),
{ok, Host} = inet:gethostname(),
{ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
- list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
+ list_to_binary(atom_to_list(make({ID, FQDN}))).
set_cluster_name(Name, Username) ->
%% Cluster name should be binary
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index d93104d02c..f28e1281c3 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -22,13 +22,13 @@ number(_Name, Term) when is_number(Term) ->
ok;
number(Name, Term) ->
- {error, "~s should be number, actually was ~p", [Name, Term]}.
+ {error, "~s should be a number, actually was ~p", [Name, Term]}.
integer(_Name, Term) when is_integer(Term) ->
ok;
integer(Name, Term) ->
- {error, "~s should be number, actually was ~p", [Name, Term]}.
+ {error, "~s should be a number, actually was ~p", [Name, Term]}.
binary(_Name, Term) when is_binary(Term) ->
ok;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7416ede4b8..e9776f47ad 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -565,7 +565,7 @@ queue_name_to_dir_name(#resource { kind = queue,
rabbit_misc:format("~.36B", [Num]).
queue_name_to_dir_name_legacy(Name = #resource { kind = queue }) ->
- <<Num:128>> = erlang:md5(term_to_binary_compat:queue_name_to_binary(Name)),
+ <<Num:128>> = erlang:md5(term_to_binary_compat:term_to_binary_1(Name)),
rabbit_misc:format("~.36B", [Num]).
queues_base_dir() ->
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index be9b1b6227..73fc9c7449 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,20 +48,35 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- {ok, _} = supervisor2:start_child(
- VHostSup,
- {?MODULE,
- {?MODULE, start_link, [VHost]},
- transient, ?WORKER_WAIT, worker,
- [?MODULE]}),
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ {ok, _} = supervisor2:start_child(
+ VHostSup,
+ {?MODULE,
+ {?MODULE, start_link, [VHost]},
+ transient, ?WORKER_WAIT, worker,
+ [?MODULE]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost])
+ end,
ok.
stop(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- case supervisor:terminate_child(VHostSup, ?MODULE) of
- ok -> supervisor:delete_child(VHostSup, ?MODULE);
- E -> E
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ case supervisor:terminate_child(VHostSup, ?MODULE) of
+ ok -> supervisor:delete_child(VHostSup, ?MODULE);
+ E -> E
+ end;
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
end.
store(VHost, DirBaseName, Terms) ->
@@ -74,7 +89,14 @@ read(VHost, DirBaseName) ->
end.
clear(VHost) ->
- ok = dets:delete_all_objects(VHost),
+ try
+ dets:delete_all_objects(VHost)
+ %% see start/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!",
+ [VHost]),
+ ok
+ end,
flush(VHost).
start_link(VHost) ->
@@ -126,8 +148,15 @@ open_global_table() ->
ok.
close_global_table() ->
- ok = dets:sync(?MODULE),
- ok = dets:close(?MODULE).
+ try
+ dets:sync(?MODULE),
+ dets:close(?MODULE)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear global recovery terms: table no longer exists!",
+ []),
+ ok
+ end.
read_global(DirBaseName) ->
read(?MODULE, DirBaseName).
@@ -163,8 +192,23 @@ open_table(VHost) ->
{ram_file, true},
{auto_save, infinity}]).
-flush(VHost) -> ok = dets:sync(VHost).
+flush(VHost) ->
+ try
+ dets:sync(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
close_table(VHost) ->
- ok = flush(VHost),
- ok = dets:close(VHost).
+ try
+ ok = flush(VHost),
+ ok = dets:close(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 40967e316e..dd08916e35 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -496,15 +496,26 @@ stop(VHost) ->
start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?TRANSIENT_MSG_STORE,
- undefined,
- ?EMPTY_START_FUN_STATE),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?PERSISTENT_MSG_STORE,
- Refs,
- StartFunState),
- rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
+ do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
+ do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
+ ok.
+
+do_start_msg_store(VHost, Type, Refs, StartFunState) ->
+ case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
+ {ok, _} ->
+ rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
+ {error, {no_such_vhost, VHost}} = Err ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n",
+ [Type, VHost]),
+ exit(Err);
+ {error, Error} ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n",
+ [Type, VHost, Error]),
+ exit({error, Error})
+ end.
+
+abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
+abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
stop_msg_store(VHost) ->
rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 4dc2ec86d0..7513c23925 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -27,8 +27,8 @@
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
--spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
--spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
+-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
@@ -104,10 +104,20 @@ add(VHostPath, ActingUser) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
- ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
- rabbit_event:notify(vhost_created, info(VHostPath)
- ++ [{user_who_performed_action, ActingUser}]),
- R.
+ case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of
+ ok ->
+ rabbit_event:notify(vhost_created, info(VHostPath)
+ ++ [{user_who_performed_action, ActingUser}]),
+ R;
+ {error, {no_such_vhost, VHostPath}} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!",
+ [VHostPath]),
+ {error, Msg};
+ {error, Reason} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': ~p",
+ [VHostPath, Reason]),
+ {error, Msg}
+ end.
delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside
@@ -125,7 +135,10 @@ delete(VHostPath, ActingUser) ->
with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
- [ok = Fun() || Fun <- Funs],
+ [case Fun() of
+ ok -> ok;
+ {error, {no_such_vhost, VHostPath}} -> ok
+ end || Fun <- Funs],
%% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
%% on all the nodes.
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 482ad082b8..3c633875bc 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,17 +23,33 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- supervisor2:start_child(VHostSup,
- {Type, {rabbit_msg_store, start_link,
- [Type, VHostDir, ClientRefs, StartupFunState]},
- transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ supervisor2:start_child(VHostSup,
+ {Type, {rabbit_msg_store, start_link,
+ [Type, VHostDir, ClientRefs, StartupFunState]},
+ transient, ?WORKER_WAIT, worker, [rabbit_msg_store]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} = E ->
+ rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ E
+ end.
stop(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, Type),
- ok = supervisor2:delete_child(VHostSup, Type).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, Type),
+ ok = supervisor2:delete_child(VHostSup, Type);
+ %% see start/4
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
+ end.
client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
@@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) ->
successfully_recovered_state(VHost, Type) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
rabbit_msg_store:successfully_recovered_state(StorePid)
- end). \ No newline at end of file
+ end).
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
index b8c7a649e5..bbf006fbd3 100644
--- a/src/rabbit_vhost_sup.erl
+++ b/src/rabbit_vhost_sup.erl
@@ -18,7 +18,8 @@
-include("rabbit.hrl").
-%% Supervisor is a per-vhost supervisor to contain queues and message stores
+%% Each vhost gets an instance of this supervisor that supervises
+%% message stores and queues (via rabbit_amqqueue_sup_sup).
-behaviour(supervisor2).
-export([init/1]).
-export([start_link/1]).
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index e528d64e0e..7ecac7a5d4 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -25,7 +25,7 @@
-export([start_link/0, start/0]).
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
--export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
+-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
%% Internal
-export([stop_and_delete_vhost/1]).
@@ -33,8 +33,10 @@
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
start() ->
- case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []},
- permanent, infinity, supervisor, [?MODULE]}) of
+ case supervisor:start_child(rabbit_sup, {?MODULE,
+ {?MODULE, start_link, []},
+ permanent, infinity, supervisor,
+ [?MODULE]}) of
{ok, _} -> ok;
{error, Err} -> {error, Err}
end.
@@ -43,17 +45,14 @@ start_link() ->
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of
- ignore -> transient;
- stop_node -> permanent;
- transient -> transient;
- permanent -> permanent
- end,
-
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ RestartStrategy = vhost_restart_strategy(),
ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
+
{ok, {{simple_one_for_one, 0, 5},
[{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
- VhostRestart, infinity, supervisor,
+ RestartStrategy, ?SUPERVISOR_WAIT, supervisor,
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
start_on_all_nodes(VHost) ->
@@ -83,7 +82,10 @@ start_vhost(VHost) ->
case supervisor2:start_child(?MODULE, [VHost]) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
- Error -> throw(Error)
+ Error ->
+ rabbit_log:error("Could not start process tree "
+ "for vhost '~s': ~p", [VHost, Error]),
+ throw(Error)
end,
{ok, _} = vhost_sup_pid(VHost);
{ok, Pid} when is_pid(Pid) ->
@@ -100,7 +102,7 @@ stop_and_delete_vhost(VHost) ->
false -> ok;
true ->
rabbit_log:info("Stopping vhost supervisor ~p"
- " for vhost ~p~n",
+ " for vhost '~s'~n",
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
@@ -169,3 +171,12 @@ vhost_sup_pid(VHost) ->
end
end.
+vhost_restart_strategy() ->
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ case application:get_env(rabbit, vhost_restart_strategy, continue) of
+ continue -> transient;
+ stop_node -> permanent;
+ transient -> transient;
+ permanent -> permanent
+ end.
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl
index 3ce726621f..be2c5f20bb 100644
--- a/src/rabbit_vhost_sup_watcher.erl
+++ b/src/rabbit_vhost_sup_watcher.erl
@@ -49,10 +49,17 @@ handle_info(check_vhost, VHost) ->
case rabbit_vhost:exists(VHost) of
true -> {noreply, VHost};
false ->
- rabbit_log:error(" Vhost \"~p\" is gone."
- " Stopping message store supervisor.",
- [VHost]),
- {stop, normal, VHost}
+ rabbit_log:warning("Virtual host '~s' is gone. "
+ "Stopping its top level supervisor.",
+ [VHost]),
+ %% Stop vhost's top supervisor in a one-off process to avoid a deadlock:
+ %% us (a child process) waiting for supervisor shutdown and our supervisor(s)
+ %% waiting for us to shutdown.
+ spawn(
+ fun() ->
+ rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost)
+ end),
+ {noreply, VHost}
end;
handle_info(_, VHost) ->
{noreply, VHost}.
@@ -63,4 +70,4 @@ code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
interval() ->
- application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. \ No newline at end of file
+ application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 3396f71fa9..8dbec30bff 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -14,6 +14,9 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%
+%% This module is a wrapper around vhost supervisor to
+%% provide exactly once restart semantics.
+
-module(rabbit_vhost_sup_wrapper).
-include("rabbit.hrl").
@@ -26,15 +29,9 @@
start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
-%% This module is a wrapper around vhost supervisor to
-%% provide exactly once restart.
-
-%% rabbit_vhost_sup supervisor children are added dynamically,
-%% so one_for_all strategy cannot be used.
-
init([VHost]) ->
- %% Two restarts in 1 hour. One per message store.
- {ok, {{one_for_all, 2, 3600000},
+ %% 2 restarts in 5 minutes. One per message store.
+ {ok, {{one_for_all, 2, 300},
[{rabbit_vhost_sup,
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
permanent, infinity, supervisor,
diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl
index f62cb846da..c606e050dd 100644
--- a/test/sync_detection_SUITE.erl
+++ b/test/sync_detection_SUITE.erl
@@ -213,7 +213,7 @@ slave_pids(Node, Queue) ->
%% The mnesia synchronization takes a while, but we don't want to wait for the
%% test to fail, since the timetrap is quite high.
wait_for_sync_status(Status, Node, Queue) ->
- Max = 10000 / ?LOOP_RECURSION_DELAY,
+ Max = 30000 / ?LOOP_RECURSION_DELAY,
wait_for_sync_status(0, Max, Status, Node, Queue).
wait_for_sync_status(N, Max, Status, Node, Queue) when N >= Max ->