summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2017-08-01 03:10:38 +0300
committerGitHub <noreply@github.com>2017-08-01 03:10:38 +0300
commitc58a15e7893bae019418c486d971046e879e0385 (patch)
treeeaa6bd05ba4ad8d5bfb2daa20df4794106d135d5 /src
parent843199f62259bea5a27793cd2ded68804f30c1a6 (diff)
parenta65b6d7e10e7880d909fff7214c39165f56d64c4 (diff)
downloadrabbitmq-server-git-c58a15e7893bae019418c486d971046e879e0385.tar.gz
Merge pull request #1315 from rabbitmq/rabbitmq-server-1310
Check if vhost supervisor is running when starting mirrors
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_sup_sup.erl6
-rw-r--r--src/rabbit_connection_tracking_handler.erl19
-rw-r--r--src/rabbit_mirror_queue_misc.erl20
-rw-r--r--src/rabbit_recovery_terms.erl4
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vhost_msg_store.erl6
-rw-r--r--src/rabbit_vhost_sup_sup.erl60
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl7
8 files changed, 83 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_sup_sup.erl b/src/rabbit_amqqueue_sup_sup.erl
index b5ef86255d..f0bcbd7c60 100644
--- a/src/rabbit_amqqueue_sup_sup.erl
+++ b/src/rabbit_amqqueue_sup_sup.erl
@@ -57,7 +57,7 @@ find_for_vhost(VHost) ->
-spec find_for_vhost(rabbit_types:vhost(), atom()) -> {ok, pid()} | {error, term()}.
find_for_vhost(VHost, Node) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost, Node),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost, Node),
case supervisor2:find_child(VHostSup, rabbit_amqqueue_sup_sup) of
[QSup] -> {ok, QSup};
Result -> {error, {queue_supervisor_not_found, Result}}
@@ -65,7 +65,7 @@ find_for_vhost(VHost, Node) ->
-spec start_for_vhost(rabbit_types:vhost()) -> {ok, pid()} | {error, term()}.
start_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
supervisor2:start_child(
VHostSup,
@@ -82,7 +82,7 @@ start_for_vhost(VHost) ->
-spec stop_for_vhost(rabbit_types:vhost()) -> ok.
stop_for_vhost(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, rabbit_amqqueue_sup_sup),
ok = supervisor2:delete_child(VHostSup, rabbit_amqqueue_sup_sup);
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index 3ae17677e0..ca13700da0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,11 +82,14 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+%% Note: under normal circumstances this will be called immediately
+%% after the vhost_deleted above. Therefore we should be careful about
+%% what we log and be more defensive.
handle_event(#event{type = vhost_down, props = Details}, State) ->
VHost = pget(name, Details),
Node = pget(node, Details),
- rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
- " because the vhost database has stopped working",
+ rabbit_log_connection:info("Closing all connections in vhost '~s' on node '~s'"
+ " because the vhost is stopping",
[VHost, Node]),
close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
rabbit_misc:format("vhost '~s' is down", [VHost])),
@@ -131,7 +134,17 @@ close_connections(Tracked, Message, Delay) ->
ok.
close_connection(#tracked_connection{pid = Pid, type = network}, Message) ->
- rabbit_networking:close_connection(Pid, Message);
+ try
+ rabbit_networking:close_connection(Pid, Message)
+ catch error:{not_a_connection, _} ->
+ %% could has been closed concurrently, or the input
+ %% is bogus. In any case, we should not terminate
+ ok;
+ _:Err ->
+ %% ignore, don't terminate
+ rabbit_log:warning("Could not close connection ~p: ~p", [Pid, Err]),
+ ok
+ end;
close_connection(#tracked_connection{pid = Pid, type = direct}, Message) ->
%% Do an RPC call to the node running the direct client.
Node = node(Pid),
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 59522da4a9..dc23095fb1 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -228,11 +228,21 @@ add_mirror(QName, MirrorNode, SyncMode) ->
rabbit_misc:with_exit_handler(
rabbit_misc:const(ok),
fun () ->
- SPid = rabbit_amqqueue_sup_sup:start_queue_process(
- MirrorNode, Q, slave),
- log_info(QName, "Adding mirror on node ~p: ~p~n",
- [MirrorNode, SPid]),
- rabbit_mirror_queue_slave:go(SPid, SyncMode)
+ #amqqueue{name = #resource{virtual_host = VHost}} = Q,
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost, MirrorNode) of
+ {ok, _} ->
+ SPid = rabbit_amqqueue_sup_sup:start_queue_process(
+ MirrorNode, Q, slave),
+ log_info(QName, "Adding mirror on node ~p: ~p~n",
+ [MirrorNode, SPid]),
+ rabbit_mirror_queue_slave:go(SPid, SyncMode);
+ {error, Error} ->
+ log_warning(QName,
+ "Unable to start queue mirror on node '~p'. "
+ "Target virtual host is not running: ~p~n",
+ [MirrorNode, Error]),
+ ok
+ end
end);
{error, not_found} = E ->
E
diff --git a/src/rabbit_recovery_terms.erl b/src/rabbit_recovery_terms.erl
index 73fc9c7449..b73f3add7c 100644
--- a/src/rabbit_recovery_terms.erl
+++ b/src/rabbit_recovery_terms.erl
@@ -48,7 +48,7 @@
%%----------------------------------------------------------------------------
start(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
{ok, _} = supervisor2:start_child(
VHostSup,
@@ -65,7 +65,7 @@ start(VHost) ->
ok.
stop(VHost) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
case supervisor:terminate_child(VHostSup, ?MODULE) of
ok -> supervisor:delete_child(VHostSup, ?MODULE);
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index 7b797e46b2..9d8a6795b4 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -55,7 +55,12 @@ notify(VHost, <<"vhost-limits">>, <<"limits">>, Limits, ActingUser) ->
notify_clear(VHost, <<"vhost-limits">>, <<"limits">>, ActingUser) ->
rabbit_event:notify(vhost_limits_cleared, [{name, <<"limits">>},
{user_who_performed_action, ActingUser}]),
- update_vhost(VHost, undefined).
+ %% If the function is called as a part of vhost deletion, the vhost can
+ %% be already deleted.
+ case rabbit_vhost:exists(VHost) of
+ true -> update_vhost(VHost, undefined);
+ false -> ok
+ end.
connection_limit(VirtualHost) ->
get_limit(VirtualHost, <<"max-connections">>).
diff --git a/src/rabbit_vhost_msg_store.erl b/src/rabbit_vhost_msg_store.erl
index 3c633875bc..b9af37c258 100644
--- a/src/rabbit_vhost_msg_store.erl
+++ b/src/rabbit_vhost_msg_store.erl
@@ -23,7 +23,7 @@
start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
ClientRefs == undefined ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
VHostDir = rabbit_vhost:msg_store_dir_path(VHost),
supervisor2:start_child(VHostSup,
@@ -39,7 +39,7 @@ start(VHost, Type, ClientRefs, StartupFunState) when is_list(ClientRefs);
end.
stop(VHost, Type) ->
- case rabbit_vhost_sup_sup:vhost_sup(VHost) of
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
{ok, VHostSup} ->
ok = supervisor2:terminate_child(VHostSup, Type),
ok = supervisor2:delete_child(VHostSup, Type);
@@ -65,7 +65,7 @@ with_vhost_store(VHost, Type, Fun) ->
end.
vhost_store_pid(VHost, Type) ->
- {ok, VHostSup} = rabbit_vhost_sup_sup:vhost_sup(VHost),
+ {ok, VHostSup} = rabbit_vhost_sup_sup:get_vhost_sup(VHost),
case supervisor2:find_child(VHostSup, Type) of
[Pid] -> Pid;
[] -> no_pid
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 1d5db93fda..93c26d4e0f 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,7 +23,7 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([init_vhost/1, vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([init_vhost/1, get_vhost_sup/1, get_vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
-export([start_on_all_nodes/1]).
@@ -72,7 +72,7 @@ delete_on_all_nodes(VHost) ->
ok.
stop_and_delete_vhost(VHost) ->
- case get_vhost_sup(VHost) of
+ StopResult = case lookup_vhost_sup_record(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid} ->
@@ -84,13 +84,15 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete(?MODULE, VHost),
- ok = rabbit_vhost:delete_storage(VHost);
+ true = ets:delete(?MODULE, VHost),
+ ok;
Other ->
Other
end
end
- end.
+ end,
+ ok = rabbit_vhost:delete_storage(VHost),
+ StopResult.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -106,7 +108,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec init_vhost(rabbit_types:vhost()) -> ok.
+-spec init_vhost(rabbit_types:vhost()) -> ok | {error, {no_such_vhost, rabbit_types:vhsot()}}.
init_vhost(VHost) ->
case start_vhost(VHost) of
{ok, _} -> ok;
@@ -130,30 +132,32 @@ init_vhost(VHost) ->
end
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
-vhost_sup(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, vhost_sup, [VHost]) of
+-type vhost_error() :: {no_such_vhost, rabbit_types:vhost()} |
+ {vhost_supervisor_not_running, rabbit_types:vhost()}.
+
+-spec get_vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, vhost_error() | term()}.
+get_vhost_sup(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, get_vhost_sup, [VHost]) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
+ {error, Err} ->
+ {error, Err};
{badrpc, RpcErr} ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
-vhost_sup(VHost) ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case start_vhost(VHost) of
- {ok, Pid} ->
- true = is_vhost_alive(VHost),
- {ok, Pid};
- {error, {no_such_vhost, VHost}} ->
- {error, {no_such_vhost, VHost}};
- Error ->
- throw(Error)
- end;
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
+-spec get_vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, vhost_error()}.
+get_vhost_sup(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false ->
+ {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ {error, {vhost_supervisor_not_running, VHost}};
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
end.
-spec start_vhost(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, term()}.
@@ -181,7 +185,7 @@ start_vhost(VHost) ->
is_vhost_alive(VHost) ->
%% A vhost is considered alive if it's supervision tree is alive and
%% saved in the ETS table
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
#vhost_sup{wrapper_pid = WrapperPid,
vhost_sup_pid = VHostSupPid,
vhost_process_pid = VHostProcessPid}
@@ -210,8 +214,8 @@ save_vhost_process(VHost, VHostProcessPid) ->
{#vhost_sup.vhost_process_pid, VHostProcessPid}),
ok.
--spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
-get_vhost_sup(VHost) ->
+-spec lookup_vhost_sup_record(rabbit_types:vhost()) -> #vhost_sup{} | not_found.
+lookup_vhost_sup_record(VHost) ->
case ets:lookup(?MODULE, VHost) of
[] -> not_found;
[#vhost_sup{} = VHostSup] -> VHostSup
@@ -219,7 +223,7 @@ get_vhost_sup(VHost) ->
-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
vhost_sup_pid(VHost) ->
- case get_vhost_sup(VHost) of
+ case lookup_vhost_sup_record(VHost) of
not_found ->
no_pid;
#vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8e23389bb9..4ae68cdd75 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -29,7 +29,12 @@
start_link(VHost) ->
%% Using supervisor, because supervisor2 does not stop a started child when
%% another one fails to start. Bug?
- supervisor:start_link(?MODULE, [VHost]).
+ case rabbit_vhost_sup_sup:get_vhost_sup(VHost) of
+ {ok, Pid} ->
+ {error, {already_started, Pid}};
+ {error, _} ->
+ supervisor:start_link(?MODULE, [VHost])
+ end.
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.