summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-03-21 17:26:53 +0000
committerDaniil Fedotov <dfedotov@pivotal.io>2017-04-12 12:13:43 +0100
commit3716131a9285b1a6ee07017d9028203d84a6c5b6 (patch)
treeab6b811e42ac7e6f1a502514db97cf70bbb2672e /src
parentdd2a79da74e7fa1d02b4e277ec119a69dbb5c3b7 (diff)
downloadrabbitmq-server-git-3716131a9285b1a6ee07017d9028203d84a6c5b6.tar.gz
Use wrapper supervisor one level above vhost supervisors
Wrapper supervisor makes it possible to make vhosts restartable exactly N times without interfering with each other. Because vhost should call recovery every time it's restarted, and recovery includes dynamically adding message stores, it's impossible to restart it using one_for_all. So vhost supervisor will just fail if it's child fails and vhost supervisor wrapper will restart it with recovery.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_vhost.erl1
-rw-r--r--src/rabbit_vhost_sup_sup.erl134
-rw-r--r--src/rabbit_vhost_sup_wrapper.erl52
-rw-r--r--src/rabbit_vm.erl25
4 files changed, 154 insertions, 58 deletions
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 6d046021fd..7d132acf44 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -106,6 +106,7 @@ add(VHostPath, ActingUser) ->
{<<"amq.rabbitmq.trace">>, topic, true}]],
ok
end),
+ ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath),
rabbit_event:notify(vhost_created, info(VHostPath)
++ [{user_who_performed_action, ActingUser}]),
R.
diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl
index 90f7be503e..919d7ca7fd 100644
--- a/src/rabbit_vhost_sup_sup.erl
+++ b/src/rabbit_vhost_sup_sup.erl
@@ -23,8 +23,14 @@
-export([init/1]).
-export([start_link/0, start/0]).
--export([vhost_sup/1, vhost_sup/2]).
--export([start_vhost/1, stop_and_delete_vhost/1, delete_on_all_nodes/1]).
+-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]).
+-export([delete_on_all_nodes/1]).
+-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]).
+
+%% Internal
+-export([stop_and_delete_vhost/1]).
+
+-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}).
start() ->
rabbit_sup:start_supervisor_child(?MODULE).
@@ -33,39 +39,67 @@ start_link() ->
supervisor2:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
- ets:new(?MODULE, [named_table, public]),
- {ok, {{simple_one_for_one, 1, 5},
- [{rabbit_vhost, {rabbit_vhost_sup_sup, start_vhost, []},
- transient, infinity, supervisor,
- [rabbit_vhost_sup_sup, rabbit_vhost_sup]}]}}.
+ ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]),
+ {ok, {{simple_one_for_one, 0, 5},
+ [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []},
+ permanent, infinity, supervisor,
+ [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}.
+
+start_on_all_nodes(VHost) ->
+ [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ ok.
-start_vhost(VHost) ->
- case rabbit_vhost_sup:start_link(VHost) of
- {ok, Pid} ->
- ok = save_vhost_pid(VHost, Pid),
- ok = rabbit_vhost:recover(VHost),
+delete_on_all_nodes(VHost) ->
+ [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
+ ok.
+
+start_vhost(VHost, Node) when Node == node(self()) ->
+ start_vhost(VHost);
+start_vhost(VHost, Node) ->
+ case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of
+ {ok, Pid} when is_pid(Pid) ->
{ok, Pid};
- Other ->
- Other
+ {badrpc, RpcErr} ->
+ {error, RpcErr}
end.
-stop_and_delete_vhost(VHost) ->
- case vhost_pid(VHost) of
- no_pid -> ok;
- Pid when is_pid(Pid) ->
- rabbit_log:info("Stopping vhost supervisor ~p for vhost ~p~n",
- [Pid, VHost]),
- case supervisor2:terminate_child(?MODULE, Pid) of
- ok ->
- ok = rabbit_vhost:delete_storage(VHost);
- Other ->
- Other
+start_vhost(VHost) ->
+ case rabbit_vhost:exists(VHost) of
+ false -> {error, {no_such_vhost, VHost}};
+ true ->
+ case vhost_sup_pid(VHost) of
+ no_pid ->
+ case supervisor2:start_child(?MODULE, [VHost]) of
+ {ok, _} -> ok;
+ {error, {already_started, _}} -> ok;
+ Error -> throw(Error)
+ end,
+ {ok, _} = vhost_sup_pid(VHost);
+ {ok, Pid} when is_pid(Pid) ->
+ {ok, Pid}
end
end.
-delete_on_all_nodes(VHost) ->
- [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ],
- ok.
+stop_and_delete_vhost(VHost) ->
+ case get_vhost_sup(VHost) of
+ not_found -> ok;
+ #vhost_sup{wrapper_pid = WrapperPid,
+ vhost_sup_pid = VHostSupPid} = VHostSup ->
+ case is_process_alive(WrapperPid) of
+ false -> ok;
+ true ->
+ rabbit_log:info("Stopping vhost supervisor ~p"
+ " for vhost ~p~n",
+ [VHostSupPid, VHost]),
+ case supervisor2:terminate_child(?MODULE, WrapperPid) of
+ ok ->
+ ets:delete_object(?MODULE, VHostSup),
+ ok = rabbit_vhost:delete_storage(VHost);
+ Other ->
+ Other
+ end
+ end
+ end.
%% We take an optimistic approach whan stopping a remote VHost supervisor.
stop_and_delete_vhost(VHost, Node) when Node == node(self()) ->
@@ -81,6 +115,7 @@ stop_and_delete_vhost(VHost, Node) ->
{error, RpcErr}
end.
+-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}.
vhost_sup(VHost, Local) when Local == node(self()) ->
vhost_sup(VHost);
vhost_sup(VHost, Node) ->
@@ -93,34 +128,33 @@ vhost_sup(VHost, Node) ->
-spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}.
vhost_sup(VHost) ->
- case rabbit_vhost:exists(VHost) of
- false -> {error, {no_such_vhost, VHost}};
- true ->
- case vhost_pid(VHost) of
- no_pid ->
- case supervisor2:start_child(?MODULE, [VHost]) of
- {ok, Pid} -> {ok, Pid};
- {error, {already_started, Pid}} -> {ok, Pid};
- Error -> throw(Error)
- end;
- Pid when is_pid(Pid) ->
- {ok, Pid}
- end
- end.
+ start_vhost(VHost).
-save_vhost_pid(VHost, Pid) ->
- true = ets:insert(?MODULE, {VHost, Pid}),
+-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok.
+save_vhost_sup(VHost, WrapperPid, VHostPid) ->
+ true = ets:insert(?MODULE, #vhost_sup{vhost = VHost,
+ vhost_sup_pid = VHostPid,
+ wrapper_pid = WrapperPid}),
ok.
--spec vhost_pid(rabbit_types:vhost()) -> no_pid | pid().
-vhost_pid(VHost) ->
+-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}.
+get_vhost_sup(VHost) ->
case ets:lookup(?MODULE, VHost) of
- [] -> no_pid;
- [{VHost, Pid}] ->
+ [] -> not_found;
+ [#vhost_sup{} = VHostSup] -> VHostSup
+ end.
+
+-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}.
+vhost_sup_pid(VHost) ->
+ case get_vhost_sup(VHost) of
+ not_found ->
+ no_pid;
+ #vhost_sup{vhost_sup_pid = Pid} = VHostSup ->
case erlang:is_process_alive(Pid) of
- true -> Pid;
+ true -> {ok, Pid};
false ->
- ets:delete_object(?MODULE, {VHost, Pid}),
+ ets:delete_object(?MODULE, VHostSup),
no_pid
end
end.
+
diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl
new file mode 100644
index 0000000000..f287436447
--- /dev/null
+++ b/src/rabbit_vhost_sup_wrapper.erl
@@ -0,0 +1,52 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_vhost_sup_wrapper).
+
+-include("rabbit.hrl").
+
+-behaviour(supervisor2).
+-export([init/1]).
+-export([start_link/1]).
+-export([start_vhost_sup/1]).
+
+start_link(VHost) ->
+ supervisor2:start_link(?MODULE, [VHost]).
+
+%% This module is a wrapper around vhost supervisor to
+%% provide exactly once restart.
+
+%% rabbit_vhost_sup supervisor children are added dynamically,
+%% so one_for_all strategy cannot be used.
+
+init([VHost]) ->
+ {ok, {{one_for_all, 1, 10000000},
+ [{rabbit_vhost_sup,
+ {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]},
+ intrinsic, infinity, supervisor,
+ [rabbit_vhost_sup]}]}}.
+
+start_vhost_sup(VHost) ->
+ case rabbit_vhost_sup:start_link(VHost) of
+ {ok, Pid} ->
+ %% Save vhost sup record with wrapper pid and vhost sup pid.
+ ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid),
+ %% We can start recover as soon as we have vhost_sup record saved
+ ok = rabbit_vhost:recover(VHost),
+ {ok, Pid};
+ Other ->
+ Other
+ end. \ No newline at end of file
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 1039ed659c..b65536c0d4 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -161,14 +161,23 @@ msg_stores() ->
all_vhosts_children(msg_store_persistent).
all_vhosts_children(Name) ->
- lists:filter_map(
- fun({_, VHostSup, _, _}) ->
- case supervisor2:find_child(VHostSup, Name) of
- [QSup] -> {true, QSup};
- [] -> false
- end
- end,
- supervisor:which_children(rabbit_vhost_sup_sup)).
+ case whereis(rabbit_vhost_sup_sup) of
+ undefined -> [];
+ Pid when is_pid(Pid) ->
+ lists:filtermap(
+ fun({_, VHostSupWrapper, _, _}) ->
+ case supervisor2:find_child(VHostSupWrapper,
+ rabbit_vhost_sup) of
+ [] -> false;
+ [VHostSup] ->
+ case supervisor2:find_child(VHostSup, Name) of
+ [QSup] -> {true, QSup};
+ [] -> false
+ end
+ end
+ end,
+ supervisor:which_children(rabbit_vhost_sup_sup))
+ end.
interesting_sups0() ->
MsgIndexProcs = msg_stores(),