summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-07-07 16:05:07 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-07-19 10:07:34 +0100
commit7a82b43bf12b737250957081d0b0d84b21b3bf72 (patch)
treed4af4be2174cb36d9e9fc24a565a771518798df6
parent49a6cff7e04172e252d958026fd787443227ba71 (diff)
downloadrabbitmq-server-git-7a82b43bf12b737250957081d0b0d84b21b3bf72.tar.gz
Change vhost supervision recovery process.
In order to be aware if vhost is alive or not, introduce a rabbit_vhost_process gen_server, which manages recovery and teardown of a vhost. Also aliveness of the process can be used to determine a vhost state. Vhost process termination emits an event to close all the vhost connections. Addresses [#145106713]
-rw-r--r--src/rabbit_connection_tracking.erl12
-rw-r--r--src/rabbit_connection_tracking_handler.erl9
-rw-r--r--src/rabbit_msg_store.erl2
-rw-r--r--src/rabbit_vhost.erl7
-rw-r--r--src/rabbit_vhost_process.erl (renamed from src/rabbit_vhost_sup_watcher.erl)42
-rw-r--r--src/rabbit_vhost_sup.erl7
-rw-r--r--src/rabbit_vhost_sup_sup.erl90
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl27
8 files changed, 135 insertions, 61 deletions
diff --git a/src/rabbit_connection_tracking.erl b/src/rabbit_connection_tracking.erl
index f8c4c6541b..27c4bff810 100644
--- a/src/rabbit_connection_tracking.erl
+++ b/src/rabbit_connection_tracking.erl
@@ -34,7 +34,7 @@
delete_tracked_connections_table_for_node/1, delete_per_vhost_tracked_connections_table_for_node/1,
clear_tracked_connection_tables_for_this_node/0,
register_connection/1, unregister_connection/1,
- list/0, list/1, list_on_node/1, list_of_user/1,
+ list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
count_connections_in/1]).
@@ -217,6 +217,16 @@ list_on_node(Node) ->
catch exit:{aborted, {no_exists, _}} -> []
end.
+-spec list_on_node(node(), rabbit_types:vhsot()) -> [rabbit_types:tracked_connection()].
+
+list_on_node(Node, VHost) ->
+ try mnesia:dirty_match_object(
+ tracked_connection_table_name_for(Node),
+ #tracked_connection{vhost = VHost, _ = '_'})
+ catch exit:{aborted, {no_exists, _}} -> []
+ end.
+
+
-spec list_of_user(rabbit_types:username()) -> [rabbit_types:tracked_connection()].
list_of_user(Username) ->
diff --git a/src/rabbit_connection_tracking_handler.erl b/src/rabbit_connection_tracking_handler.erl
index f1b844c60c..3ae17677e0 100644
--- a/src/rabbit_connection_tracking_handler.erl
+++ b/src/rabbit_connection_tracking_handler.erl
@@ -82,6 +82,15 @@ handle_event(#event{type = vhost_deleted, props = Details}, State) ->
close_connections(rabbit_connection_tracking:list(VHost),
rabbit_misc:format("vhost '~s' is deleted", [VHost])),
{ok, State};
+handle_event(#event{type = vhost_down, props = Details}, State) ->
+ VHost = pget(name, Details),
+ Node = pget(node, Details),
+ rabbit_log_connection:info("Closing all connections in vhost '~s' at node '~s'"
+ " because the vhost database has stopped working",
+ [VHost, Node]),
+ close_connections(rabbit_connection_tracking:list_on_node(Node, VHost),
+ rabbit_misc:format("vhost '~s' is down", [VHost])),
+ {ok, State};
handle_event(#event{type = user_deleted, props = Details}, State) ->
Username = pget(name, Details),
rabbit_log_connection:info("Closing all connections from user '~s' because it's being deleted", [Username]),
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 275a9127d1..fe78075d0f 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1010,7 +1010,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
- "for directory ~p~nError: ~p~n",
+ " for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 7513c23925..85a967816d 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -26,6 +26,7 @@
-export([info/1, info/2, info_all/0, info_all/1, info_all/2, info_all/3]).
-export([dir/1, msg_store_dir_path/1, msg_store_dir_wildcard/0]).
-export([delete_storage/1]).
+-export([vhost_down/1]).
-spec add(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
-spec delete(rabbit_types:vhost(), rabbit_types:username()) -> rabbit_types:ok_or_error(any()).
@@ -144,6 +145,12 @@ delete(VHostPath, ActingUser) ->
rabbit_vhost_sup_sup:delete_on_all_nodes(VHostPath),
ok.
+vhost_down(VHostPath) ->
+ ok = rabbit_event:notify(vhost_down,
+ [{name, VHostPath},
+ {node, node()},
+ {user_who_performed_action, ?INTERNAL_USER}]).
+
delete_storage(VHost) ->
VhostDir = msg_store_dir_path(VHost),
rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]),
diff --git a/src/rabbit_vhost_sup_watcher.erl b/src/rabbit_vhost_process.erl
index be2c5f20bb..c16a24eb07 100644
--- a/src/rabbit_vhost_sup_watcher.erl
+++ b/src/rabbit_vhost_process.erl
@@ -14,10 +14,21 @@
%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
%%
-%% This module implements a watcher process which should stop
-%% the parent supervisor if its vhost is missing from the mnesia DB
+%% This module implements a vhost identity process.
--module(rabbit_vhost_sup_watcher).
+%% On start this process will try to recover the vhost data and
+%% processes structure (queues and message stores).
+%% If recovered successfully, the process will save it's PID
+%% to vhost process registry. If vhost process PID is in the registry and the
+%% process is alive - the vhost is considered running.
+
+%% On termination, the ptocess will notify of vhost going down.
+
+%% The process will also check periodically if the vhost still
+%% present in mnesia DB and stop the vhost supervision tree when it
+%% desapears.
+
+-module(rabbit_vhost_process).
-include("rabbit.hrl").
@@ -29,15 +40,26 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-
start_link(VHost) ->
gen_server2:start_link(?MODULE, [VHost], []).
init([VHost]) ->
- Interval = interval(),
- timer:send_interval(Interval, check_vhost),
- {ok, VHost}.
+ process_flag(trap_exit, true),
+ rabbit_log:debug("Recovering data for VHost ~p~n", [VHost]),
+ try
+ %% Recover the vhost data and save it to vhost registry.
+ ok = rabbit_vhost:recover(VHost),
+ rabbit_vhost_sup_sup:save_vhost_process(VHost, self()),
+ Interval = interval(),
+ timer:send_interval(Interval, check_vhost),
+ {ok, VHost}
+ catch _:Reason ->
+ rabbit_log:error("Unable to recover vhost ~p data. Reason ~p~n"
+ " Stacktrace ~p",
+ [VHost, Reason, erlang:get_stacktrace()]),
+ {stop, Reason}
+ end.
handle_call(_,_,VHost) ->
{reply, ok, VHost}.
@@ -64,7 +86,11 @@ handle_info(check_vhost, VHost) ->
handle_info(_, VHost) ->
{noreply, VHost}.
-terminate(_, _) -> ok.
+terminate(shutdown, VHost) ->
+ %% Notify that vhost is stopped.
+ rabbit_vhost:vhost_down(VHost);
+terminate(_, _VHost) ->
+ ok.
code_change(_OldVsn, VHost, _Extra) ->
{ok, VHost}.
diff --git a/src/rabbit_vhost_sup.erl b/src/rabbit_vhost_sup.erl
index bbf006fbd3..4e8688ac9a 100644
--- a/src/rabbit_vhost_sup.erl
+++ b/src/rabbit_vhost_sup.erl
@@ -28,8 +28,5 @@ start_link(VHost) ->
supervisor2:start_link(?MODULE, [VHost]).
init([VHost]) ->
- {ok, {{one_for_all, 0, 1},
- [{rabbit_vhost_sup_watcher,
- {rabbit_vhost_sup_watcher, start_link, [VHost]},
- intrinsic, ?WORKER_WAIT, worker,
- [rabbit_vhost_sup]}]}}.
+rabbit_log:error("Starting VHost sup ~p~n", [VHost]),
+ {ok, {{one_for_all, 0, 1}, []}}.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 7ecac7a5d4..3d33b872ab 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -25,12 +25,15 @@
-export([start_link/0, start/0]).
-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
-export([delete_on_all_nodes/1]).
--export([start_vhost/1, start_vhost/2, start_on_all_nodes/1, vhost_restart_strategy/0]).
+-export([start_on_all_nodes/1]).
+
+-export([save_vhost_process/2]).
+-export([is_vhost_alive/1]).
%% Internal
-export([stop_and_delete_vhost/1]).
--record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid, vhost_process_pid}).
start() ->
case supervisor:start_child(rabbit_sup, {?MODULE,
@@ -56,48 +59,18 @@ init([]) ->
[rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
start_on_all_nodes(VHost) ->
- [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ [ {ok, _} = vhost_sup(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.
delete_on_all_nodes(VHost) ->
[ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
ok.
-start_vhost(VHost, Node) when Node == node(self()) ->
- start_vhost(VHost);
-start_vhost(VHost, Node) ->
- case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid};
- {badrpc, RpcErr} ->
- {error, RpcErr}
- end.
-
-start_vhost(VHost) ->
- case rabbit_vhost:exists(VHost) of
- false -> {error, {no_such_vhost, VHost}};
- true ->
- case vhost_sup_pid(VHost) of
- no_pid ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, _} -> ok;
- {error, {already_started, _}} -> ok;
- Error ->
- rabbit_log:error("Could not start process tree "
- "for vhost '~s': ~p", [VHost, Error]),
- throw(Error)
- end,
- {ok, _} = vhost_sup_pid(VHost);
- {ok, Pid} when is_pid(Pid) ->
- {ok, Pid}
- end
- end.
-
stop_and_delete_vhost(VHost) ->
case get_vhost_sup(VHost) of
not_found -> ok;
#vhost_sup{wrapper_pid = WrapperPid,
- vhost_sup_pid = VHostSupPid} = VHostSup ->
+ vhost_sup_pid = VHostSupPid} ->
case is_process_alive(WrapperPid) of
false -> ok;
true ->
@@ -106,7 +79,7 @@ stop_and_delete_vhost(VHost) ->
[VHostSupPid, VHost]),
case supervisor2:terminate_child(?MODULE, WrapperPid) of
ok ->
- ets:delete_object(?MODULE, VHostSup),
+ ets:delete(?MODULE, VHost),
ok = rabbit_vhost:delete_storage(VHost);
Other ->
Other
@@ -128,7 +101,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
+-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()} | term()}.
vhost_sup(VHost, Local) when Local == node(self()) ->
vhost_sup(VHost);
vhost_sup(VHost, Node) ->
@@ -139,9 +112,44 @@ vhost_sup(VHost, Node) ->
{error, RpcErr}
end.
--spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
+-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()} | {error, {no_such_vhost, rabbit_types:vhost()}}.
vhost_sup(VHost) ->
- start_vhost(VHost).
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, _} -> ok;
+ {error, {already_started, _}} -> ok;
+ Error -> throw(Error)
+ end,
+ {ok, _} = vhost_sup_pid(VHost);
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
+ end
+ end.
+
+
+-spec is_vhost_alive(rabbit_types:vhost()) -> boolean().
+is_vhost_alive(VHost) ->
+%% A vhost is considered alive if it's supervision tree is alive and
+%% saved in the ETS table
+ case get_vhost_sup(VHost) of
+ #vhost_sup{wrapper_pid = WrapperPid,
+ vhost_sup_pid = VHostSupPid,
+ vhost_process_pid = VHostProcessPid}
+ when is_pid(WrapperPid),
+ is_pid(VHostSupPid),
+ is_pid(VHostProcessPid) ->
+ is_process_alive(WrapperPid)
+ andalso
+ is_process_alive(VHostSupPid)
+ andalso
+ is_process_alive(VHostProcessPid);
+ _ -> false
+ end.
+
-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
save_vhost_sup(VHost, WrapperPid, VHostPid) ->
@@ -150,6 +158,12 @@ save_vhost_sup(VHost, WrapperPid, VHostPid) ->
wrapper_pid = WrapperPid}),
ok.
+-spec save_vhost_process(rabbit_types:vhost(), pid()) -> ok.
+save_vhost_process(VHost, VHostProcessPid) ->
+ true = ets:update_element(?MODULE, VHost,
+ {#vhost_sup.vhost_process_pid, VHostProcessPid}),
+ ok.
+
-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
get_vhost_sup(VHost) ->
case ets:lookup(?MODULE, VHost) of
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
index 8dbec30bff..8e23389bb9 100644
--- a/src/rabbit_vhost_sup_wrapper.erl
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -27,24 +27,35 @@
-export([start_vhost_sup/1]).
start_link(VHost) ->
- supervisor2:start_link(?MODULE, [VHost]).
+ %% Using supervisor, because supervisor2 does not stop a started child when
+ %% another one fails to start. Bug?
+ supervisor:start_link(?MODULE, [VHost]).
init([VHost]) ->
%% 2 restarts in 5 minutes. One per message store.
{ok, {{one_for_all, 2, 300},
- [{rabbit_vhost_sup,
- {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
- permanent, infinity, supervisor,
- [rabbit_vhost_sup]}]}}.
+ [
+ %% rabbit_vhost_sup is an empty supervisor container for
+ %% all data processes.
+ {rabbit_vhost_sup,
+ {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
+ permanent, infinity, supervisor,
+ [rabbit_vhost_sup]},
+ %% rabbit_vhost_process is a vhost identity process, which
+ %% is responsible for data recovery and vhost aliveness status.
+ %% See the module comments for more info.
+ {rabbit_vhost_process,
+ {rabbit_vhost_process, start_link, [VHost]},
+ permanent, ?WORKER_WAIT, worker,
+ [rabbit_vhost_process]}]}}.
+
start_vhost_sup(VHost) ->
case rabbit_vhost_sup:start_link(VHost) of
{ok, Pid} ->
%% Save vhost sup record with wrapper pid and vhost sup pid.
ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
- %% We can start recover as soon as we have vhost_sup record saved
- ok = rabbit_vhost:recover(VHost),
{ok, Pid};
Other ->
Other
- end.
+ end. \ No newline at end of file