summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-07-08 01:49:28 +0300
committerGitHub <noreply@github.com>2017-07-08 01:49:28 +0300
commit99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4 (patch)
tree6992af3d5a3cee85a5cd38e5d7e302baee146f5c /src
parent5fa999bbf6e319fa250007fe3b5f5c09c3d70691 (diff)
parentaa1496bb198900355291617038d40592998c4a1c (diff)
downloadrabbitmq-server-git-99b3b0a290dc32c29887ea6b7ed880ce68bfd4e4.tar.gz
Merge pull request #1283 from rabbitmq/rabbitmq-server-1280
Handle concurrent vhost creation and deletion better
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl33
-rw-r--r--src/rabbit_recovery_terms.erl78
-rw-r--r--src/rabbit_variable_queue.erl29
-rw-r--r--src/rabbit_vhost.erl27
-rw-r--r--src/rabbit_vhost_msg_store.erl36
-rw-r--r--src/rabbit_vhost_sup.erl3
-rw-r--r--src/rabbit_vhost_sup_sup.erl37
-rw-r--r--src/rabbit_vhost_sup_watcher.erl17
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl13
10 files changed, 201 insertions, 81 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4eead35c1d..81eb5edf6e 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -234,8 +234,13 @@ recover(VHost) ->
%% for further processing in recover_durable_queues.
{ok, OrderedRecoveryTerms} =
BQ:start(VHost, [QName || #amqqueue{name = QName} <- Queues]),
- {ok, _} = rabbit_amqqueue_sup_sup:start_for_vhost(VHost),
- recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms)).
+ case rabbit_amqqueue_sup_sup:start_for_vhost(VHost) of
+ {ok, _} ->
+ recover_durable_queues(lists:zip(Queues, OrderedRecoveryTerms));
+ {error, Reason} ->
+ rabbit_log:error("Failed to start queue supervisor for vhost '~s': ~s", [VHost, Reason]),
+ throw({error, Reason})
+ end.
stop(VHost) ->
ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost),
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index 347dbbb48a..b5ef86255d 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -65,15 +65,30 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- supervisor2:start_child(
- VHostSup,
- {rabbit_amqqueue_sup_sup,
- {rabbit_amqqueue_sup_sup, start_link, []},
- transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ supervisor2:start_child(
+ VHostSup,
+ {rabbit_amqqueue_sup_sup,
+ {rabbit_amqqueue_sup_sup, start_link, []},
+ transient, infinity, supervisor, [rabbit_amqqueue_sup_sup]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ {error, {no_such_vhost, VHost}}
+ end.
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
- ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup). \ No newline at end of file
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
+ ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a queue process supervisor for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index be9b1b6227..73fc9c7449 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,20 +48,35 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- {ok, _} = supervisor2:start_child(
- VHostSup,
- {?MODULE,
- {?MODULE, start_link, [VHost]},
- transient, ?WORKER_WAIT, worker,
- [?MODULE]}),
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ {ok, _} = supervisor2:start_child(
+ VHostSup,
+ {?MODULE,
+ {?MODULE, start_link, [VHost]},
+ transient, ?WORKER_WAIT, worker,
+ [?MODULE]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to start a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost])
+ end,
ok.
stop(VHost) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- case supervisor:terminate_child(VHostSup, ?MODULE) of
- ok -> supervisor:delete_child(VHostSup, ?MODULE);
- E -> E
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ case supervisor:terminate_child(VHostSup, ?MODULE) of
+ ok -> supervisor:delete_child(VHostSup, ?MODULE);
+ E -> E
+ end;
+ %% see start/1
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a recovery terms manager for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
end.
store(VHost, DirBaseName, Terms) ->
@@ -74,7 +89,14 @@ read(VHost, DirBaseName) ->
end.
clear(VHost) ->
- ok = dets:delete_all_objects(VHost),
+ try
+ dets:delete_all_objects(VHost)
+ %% see start/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear recovery terms for vhost ~s: table no longer exists!",
+ [VHost]),
+ ok
+ end,
flush(VHost).
start_link(VHost) ->
@@ -126,8 +148,15 @@ open_global_table() ->
ok.
close_global_table() ->
- ok = dets:sync(?MODULE),
- ok = dets:close(?MODULE).
+ try
+ dets:sync(?MODULE),
+ dets:close(?MODULE)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to clear global recovery terms: table no longer exists!",
+ []),
+ ok
+ end.
read_global(DirBaseName) ->
read(?MODULE, DirBaseName).
@@ -163,8 +192,23 @@ open_table(VHost) ->
{ram_file, true},
{auto_save, infinity}]).
-flush(VHost) -> ok = dets:sync(VHost).
+flush(VHost) ->
+ try
+ dets:sync(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to sync recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
close_table(VHost) ->
- ok = flush(VHost),
- ok = dets:close(VHost).
+ try
+ ok = flush(VHost),
+ ok = dets:close(VHost)
+ %% see clear/1
+ catch _:badarg ->
+ rabbit_log:error("Failed to close recovery terms table for vhost ~s: the table no longer exists!",
+ [VHost]),
+ ok
+ end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 40967e316e..dd08916e35 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -496,15 +496,26 @@ stop(VHost) ->
start_msg_store(VHost, Refs, StartFunState) when is_list(Refs); Refs == undefined ->
rabbit_log:info("Starting message stores for vhost '~s'~n", [VHost]),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?TRANSIENT_MSG_STORE,
- undefined,
- ?EMPTY_START_FUN_STATE),
- {ok, _} = rabbit_vhost_msg_store:start(VHost,
- ?PERSISTENT_MSG_STORE,
- Refs,
- StartFunState),
- rabbit_log:info("Message stores for vhost '~s' are started~n", [VHost]).
+ do_start_msg_store(VHost, ?TRANSIENT_MSG_STORE, undefined, ?EMPTY_START_FUN_STATE),
+ do_start_msg_store(VHost, ?PERSISTENT_MSG_STORE, Refs, StartFunState),
+ ok.
+
+do_start_msg_store(VHost, Type, Refs, StartFunState) ->
+ case rabbit_vhost_msg_store:start(VHost, Type, Refs, StartFunState) of
+ {ok, _} ->
+ rabbit_log:info("Started message store of type ~s for vhost '~s'~n", [abbreviated_type(Type), VHost]);
+ {error, {no_such_vhost, VHost}} = Err ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': the vhost no longer exists!~n",
+ [Type, VHost]),
+ exit(Err);
+ {error, Error} ->
+ rabbit_log:error("Failed to start message store of type ~s for vhost '~s': ~p~n",
+ [Type, VHost, Error]),
+ exit({error, Error})
+ end.
+
+abbreviated_type(?TRANSIENT_MSG_STORE) -> transient;
+abbreviated_type(?PERSISTENT_MSG_STORE) -> persistent.
stop_msg_store(VHost) ->
rabbit_vhost_msg_store:stop(VHost, ?TRANSIENT_MSG_STORE),
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 4dc2ec86d0..7513c23925 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -27,8 +27,8 @@
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
--spec add(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
--spec delete(rabbit_types:vhost(), rabbit_types:username()) -> 'ok'.
+-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
+-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec update(rabbit_types:vhost(), rabbit_misc:thunk(A)) -> A.
-spec exists(rabbit_types:vhost()) -> boolean().
-spec list() -> [rabbit_types:vhost()].
@@ -104,10 +104,20 @@ add(VHostPath, ActingUser) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
- ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
- rabbit_event:notify(vhost_created, info(VHostPath)
- ++ [{user_who_performed_action, ActingUser}]),
- R.
+ case rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath) of
+ ok ->
+ rabbit_event:notify(vhost_created, info(VHostPath)
+ ++ [{user_who_performed_action, ActingUser}]),
+ R;
+ {error, {no_such_vhost, VHostPath}} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': it was concurrently deleted!",
+ [VHostPath]),
+ {error, Msg};
+ {error, Reason} ->
+ Msg = rabbit_misc:format("failed to set up vhost '~s': ~p",
+ [VHostPath, Reason]),
+ {error, Msg}
+ end.
delete(VHostPath, ActingUser) ->
%% FIXME: We are forced to delete the queues and exchanges outside
@@ -125,7 +135,10 @@ delete(VHostPath, ActingUser) ->
with(VHostPath, fun () -> internal_delete(VHostPath, ActingUser) end)),
ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath},
{user_who_performed_action, ActingUser}]),
- [ok = Fun() || Fun <- Funs],
+ [case Fun() of
+ ok -> ok;
+ {error, {no_such_vhost, VHostPath}} -> ok
+ end || Fun <- Funs],
%% After vhost was deleted from mnesia DB, we try to stop vhost supervisors
%% on all the nodes.
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 482ad082b8..3c633875bc 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,17 +23,33 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
- supervisor2:start_child(VHostSup,
- {Type, {rabbit_msg_store, start_link,
- [Type, VHostDir, ClientRefs, StartupFunState]},
- transient, ?WORKER_WAIT, worker, [rabbit_msg_store]}).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
+ supervisor2:start_child(VHostSup,
+ {Type, {rabbit_msg_store, start_link,
+ [Type, VHostDir, ClientRefs, StartupFunState]},
+ transient, ?WORKER_WAIT, worker, [rabbit_msg_store]});
+ %% we can get here if a vhost is added and removed concurrently
+ %% e.g. some integration tests do it
+ {error, {no_such_vhost, VHost}} = E ->
+ rabbit_log:error("Failed to start a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+ E
+ end.
stop(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
- ok = supervisor2:terminate_child(VHostSup, Type),
- ok = supervisor2:delete_child(VHostSup, Type).
+ case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ {ok, VHostSup} ->
+ ok = supervisor2:terminate_child(VHostSup, Type),
+ ok = supervisor2:delete_child(VHostSup, Type);
+ %% see start/4
+ {error, {no_such_vhost, VHost}} ->
+ rabbit_log:error("Failed to stop a message store for vhost ~s: vhost no longer exists!",
+ [VHost]),
+
+ ok
+ end.
client_init(VHost, Type, Ref, MsgOnDiskFun, CloseFDsFun) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
@@ -58,4 +74,4 @@ vhost_store_pid(VHost, Type) ->
successfully_recovered_state(VHost, Type) ->
with_vhost_store(VHost, Type, fun(StorePid) ->
rabbit_msg_store:successfully_recovered_state(StorePid)
- end). \ No newline at end of file
+ end).
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
index b8c7a649e5..bbf006fbd3 100644
--- a/src/rabbit_vhost_sup.erl
+++ b/src/rabbit_vhost_sup.erl
@@ -18,7 +18,8 @@
-include("rabbit.hrl").
-%% Supervisor is a per-vhost supervisor to contain queues and message stores
+%% Each vhost gets an instance of this supervisor that supervises
+%% message stores and queues (via rabbit_amqqueue_sup_sup).
-behaviour(supervisor2).
-export([init/1]).
-export([start_link/1]).
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index e528d64e0e..7ecac7a5d4 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -25,7 +25,7 @@
-export([start_link/0, start/0]).
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
--export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
+-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
%% Internal
-export([stop_and_delete_vhost/1]).
@@ -33,8 +33,10 @@
-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
start() ->
- case supervisor:start_child(rabbit_sup, {?MODULE, {?MODULE, start_link, []},
- permanent, infinity, supervisor, [?MODULE]}) of
+ case supervisor:start_child(rabbit_sup, {?MODULE,
+ {?MODULE, start_link, []},
+ permanent, infinity, supervisor,
+ [?MODULE]}) of
{ok, _} -> ok;
{error, Err} -> {error, Err}
end.
@@ -43,17 +45,14 @@ start_link() ->
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- VhostRestart = case application:get_env(rabbit, vhost_restart_strategy, stop_node) of
- ignore -> transient;
- stop_node -> permanent;
- transient -> transient;
- permanent -> permanent
- end,
-
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ RestartStrategy = vhost_restart_strategy(),
ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
+
{ok, {{simple_one_for_one, 0, 5},
[{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
- VhostRestart, infinity, supervisor,
+ RestartStrategy, ?SUPERVISOR_WAIT, supervisor,
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
start_on_all_nodes(VHost) ->
@@ -83,7 +82,10 @@ start_vhost(VHost) ->
case supervisor2:start_child(?MODULE, [VHost]) of
{ok, _} -> ok;
{error, {already_started, _}} -> ok;
- Error -> throw(Error)
+ Error ->
+ rabbit_log:error("Could not start process tree "
+ "for vhost '~s': ~p", [VHost, Error]),
+ throw(Error)
end,
{ok, _} = vhost_sup_pid(VHost);
{ok, Pid} when is_pid(Pid) ->
@@ -100,7 +102,7 @@ stop_and_delete_vhost(VHost) ->
false -> ok;
true ->
rabbit_log:info("Stopping vhost supervisor ~p"
- " for vhost ~p~n",
+ " for vhost '~s'~n",
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
@@ -169,3 +171,12 @@ vhost_sup_pid(VHost) ->
end
end.
+vhost_restart_strategy() ->
+ %% This assumes that a single vhost termination should not shut down nodes
+ %% unless the operator opts in.
+ case application:get_env(rabbit, vhost_restart_strategy, continue) of
+ continue -> transient;
+ stop_node -> permanent;
+ transient -> transient;
+ permanent -> permanent
+ end.
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_sup_watcher.erl
index 3ce726621f..be2c5f20bb 100644
--- a/src/rabbit_vhost_sup_watcher.erl
+++ b/src/rabbit_vhost_sup_watcher.erl
@@ -49,10 +49,17 @@ handle_info(check_vhost, VHost) ->
case rabbit_vhost:exists(VHost) of
true -> {noreply, VHost};
false ->
- rabbit_log:error(" Vhost \"~p\" is gone."
- " Stopping message store supervisor.",
- [VHost]),
- {stop, normal, VHost}
+ rabbit_log:warning("Virtual host '~s' is gone. "
+ "Stopping its top level supervisor.",
+ [VHost]),
+ %% Stop vhost's top supervisor in a one-off process to avoid a deadlock:
+ %% us (a child process) waiting for supervisor shutdown and our supervisor(s)
+ %% waiting for us to shutdown.
+ spawn(
+ fun() ->
+ rabbit_vhost_sup_sup:stop_and_delete_vhost(VHost)
+ end),
+ {noreply, VHost}
end;
handle_info(_, VHost) ->
{noreply, VHost}.
@@ -63,4 +70,4 @@ code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
interval() ->
- application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO. \ No newline at end of file
+ application:get_env(kernel, net_ticktime, 60000) * ?TICKTIME_RATIO.
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 3396f71fa9..8dbec30bff 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -14,6 +14,9 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%
+%% This module is a wrapper around vhost supervisor to
+%% provide exactly once restart semantics.
+
-module(rabbit_vhost_sup_wrapper).
-include("rabbit.hrl").
@@ -26,15 +29,9 @@
start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
-%% This module is a wrapper around vhost supervisor to
-%% provide exactly once restart.
-
-%% rabbit_vhost_sup supervisor children are added dynamically,
-%% so one_for_all strategy cannot be used.
-
init([VHost]) ->
- %% Two restarts in 1 hour. One per message store.
- {ok, {{one_for_all, 2, 3600000},
+ %% 2 restarts in 5 minutes. One per message store.
+ {ok, {{one_for_all, 2, 300},
[{rabbit_vhost_sup,
{rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
permanent, infinity, supervisor,