diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-03-21 17:26:53 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-04-12 12:13:43 +0100 |
| commit | 3716131a9285b1a6ee07017d9028203d84a6c5b6 (patch) | |
| tree | ab6b811e42ac7e6f1a502514db97cf70bbb2672e /src | |
| parent | dd2a79da74e7fa1d02b4e277ec119a69dbb5c3b7 (diff) | |
| download | rabbitmq-server-git-3716131a9285b1a6ee07017d9028203d84a6c5b6.tar.gz | |
Use wrapper supervisor one level above vhost supervisors
Wrapper supervisor makes it possible to make vhosts restartable
exactly N times without interfering with each other.
Because vhost should call recovery every time it's restarted,
and recovery includes dynamically adding message stores,
it's impossible to restart it using one_for_all.
So vhost supervisor will just fail if it's child fails
and vhost supervisor wrapper will restart it with recovery.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_vhost.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_wrapper.erl | 52 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 25 |
4 files changed, 154 insertions, 58 deletions
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 6d046021fd..7d132acf44 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -106,6 +106,7 @@ add(VHostPath, ActingUser) -> {<<"amq.rabbitmq.trace">>, topic, true}]], ok end), + ok = rabbit_vhost_sup_sup:start_on_all_nodes(VHostPath), rabbit_event:notify(vhost_created, info(VHostPath) ++ [{user_who_performed_action, ActingUser}]), R. diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 90f7be503e..919d7ca7fd 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -23,8 +23,14 @@ -export([init/1]). -export([start_link/0, start/0]). --export([vhost_sup/1, vhost_sup/2]). --export([start_vhost/1, stop_and_delete_vhost/1, delete_on_all_nodes/1]). +-export([vhost_sup/1, vhost_sup/2, save_vhost_sup/3]). +-export([delete_on_all_nodes/1]). +-export([start_vhost/1, start_vhost/2, start_on_all_nodes/1]). + +%% Internal +-export([stop_and_delete_vhost/1]). + +-record(vhost_sup, {vhost, vhost_sup_pid, wrapper_pid}). start() -> rabbit_sup:start_supervisor_child(?MODULE). @@ -33,39 +39,67 @@ start_link() -> supervisor2:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - ets:new(?MODULE, [named_table, public]), - {ok, {{simple_one_for_one, 1, 5}, - [{rabbit_vhost, {rabbit_vhost_sup_sup, start_vhost, []}, - transient, infinity, supervisor, - [rabbit_vhost_sup_sup, rabbit_vhost_sup]}]}}. + ets:new(?MODULE, [named_table, public, {keypos, #vhost_sup.vhost}]), + {ok, {{simple_one_for_one, 0, 5}, + [{rabbit_vhost, {rabbit_vhost_sup_wrapper, start_link, []}, + permanent, infinity, supervisor, + [rabbit_vhost_sup_wrapper, rabbit_vhost_sup]}]}}. + +start_on_all_nodes(VHost) -> + [ {ok, _} = start_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. -start_vhost(VHost) -> - case rabbit_vhost_sup:start_link(VHost) of - {ok, Pid} -> - ok = save_vhost_pid(VHost, Pid), - ok = rabbit_vhost:recover(VHost), +delete_on_all_nodes(VHost) -> + [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], + ok. + +start_vhost(VHost, Node) when Node == node(self()) -> + start_vhost(VHost); +start_vhost(VHost, Node) -> + case rabbit_misc:rpc_call(Node, rabbit_vhost_sup_sup, start_vhost, [VHost]) of + {ok, Pid} when is_pid(Pid) -> {ok, Pid}; - Other -> - Other + {badrpc, RpcErr} -> + {error, RpcErr} end. -stop_and_delete_vhost(VHost) -> - case vhost_pid(VHost) of - no_pid -> ok; - Pid when is_pid(Pid) -> - rabbit_log:info("Stopping vhost supervisor ~p for vhost ~p~n", - [Pid, VHost]), - case supervisor2:terminate_child(?MODULE, Pid) of - ok -> - ok = rabbit_vhost:delete_storage(VHost); - Other -> - Other +start_vhost(VHost) -> + case rabbit_vhost:exists(VHost) of + false -> {error, {no_such_vhost, VHost}}; + true -> + case vhost_sup_pid(VHost) of + no_pid -> + case supervisor2:start_child(?MODULE, [VHost]) of + {ok, _} -> ok; + {error, {already_started, _}} -> ok; + Error -> throw(Error) + end, + {ok, _} = vhost_sup_pid(VHost); + {ok, Pid} when is_pid(Pid) -> + {ok, Pid} end end. -delete_on_all_nodes(VHost) -> - [ stop_and_delete_vhost(VHost, Node) || Node <- rabbit_nodes:all_running() ], - ok. +stop_and_delete_vhost(VHost) -> + case get_vhost_sup(VHost) of + not_found -> ok; + #vhost_sup{wrapper_pid = WrapperPid, + vhost_sup_pid = VHostSupPid} = VHostSup -> + case is_process_alive(WrapperPid) of + false -> ok; + true -> + rabbit_log:info("Stopping vhost supervisor ~p" + " for vhost ~p~n", + [VHostSupPid, VHost]), + case supervisor2:terminate_child(?MODULE, WrapperPid) of + ok -> + ets:delete_object(?MODULE, VHostSup), + ok = rabbit_vhost:delete_storage(VHost); + Other -> + Other + end + end + end. %% We take an optimistic approach whan stopping a remote VHost supervisor. stop_and_delete_vhost(VHost, Node) when Node == node(self()) -> @@ -81,6 +115,7 @@ stop_and_delete_vhost(VHost, Node) -> {error, RpcErr} end. +-spec vhost_sup(rabbit_types:vhost(), node()) -> {ok, pid()}. vhost_sup(VHost, Local) when Local == node(self()) -> vhost_sup(VHost); vhost_sup(VHost, Node) -> @@ -93,34 +128,33 @@ vhost_sup(VHost, Node) -> -spec vhost_sup(rabbit_types:vhost()) -> {ok, pid()}. vhost_sup(VHost) -> - case rabbit_vhost:exists(VHost) of - false -> {error, {no_such_vhost, VHost}}; - true -> - case vhost_pid(VHost) of - no_pid -> - case supervisor2:start_child(?MODULE, [VHost]) of - {ok, Pid} -> {ok, Pid}; - {error, {already_started, Pid}} -> {ok, Pid}; - Error -> throw(Error) - end; - Pid when is_pid(Pid) -> - {ok, Pid} - end - end. + start_vhost(VHost). -save_vhost_pid(VHost, Pid) -> - true = ets:insert(?MODULE, {VHost, Pid}), +-spec save_vhost_sup(rabbit_types:vhost(), pid(), pid()) -> ok. +save_vhost_sup(VHost, WrapperPid, VHostPid) -> + true = ets:insert(?MODULE, #vhost_sup{vhost = VHost, + vhost_sup_pid = VHostPid, + wrapper_pid = WrapperPid}), ok. --spec vhost_pid(rabbit_types:vhost()) -> no_pid | pid(). -vhost_pid(VHost) -> +-spec get_vhost_sup(rabbit_types:vhost()) -> #vhost_sup{}. +get_vhost_sup(VHost) -> case ets:lookup(?MODULE, VHost) of - [] -> no_pid; - [{VHost, Pid}] -> + [] -> not_found; + [#vhost_sup{} = VHostSup] -> VHostSup + end. + +-spec vhost_sup_pid(rabbit_types:vhost()) -> no_pid | {ok, pid()}. +vhost_sup_pid(VHost) -> + case get_vhost_sup(VHost) of + not_found -> + no_pid; + #vhost_sup{vhost_sup_pid = Pid} = VHostSup -> case erlang:is_process_alive(Pid) of - true -> Pid; + true -> {ok, Pid}; false -> - ets:delete_object(?MODULE, {VHost, Pid}), + ets:delete_object(?MODULE, VHostSup), no_pid end end. + diff --git a/src/rabbit_vhost_sup_wrapper.erl b/src/rabbit_vhost_sup_wrapper.erl new file mode 100644 index 0000000000..f287436447 --- /dev/null +++ b/src/rabbit_vhost_sup_wrapper.erl @@ -0,0 +1,52 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2017 Pivotal Software, Inc. All rights reserved. +%% + +-module(rabbit_vhost_sup_wrapper). + +-include("rabbit.hrl"). + +-behaviour(supervisor2). +-export([init/1]). +-export([start_link/1]). +-export([start_vhost_sup/1]). + +start_link(VHost) -> + supervisor2:start_link(?MODULE, [VHost]). + +%% This module is a wrapper around vhost supervisor to +%% provide exactly once restart. + +%% rabbit_vhost_sup supervisor children are added dynamically, +%% so one_for_all strategy cannot be used. + +init([VHost]) -> + {ok, {{one_for_all, 1, 10000000}, + [{rabbit_vhost_sup, + {rabbit_vhost_sup_wrapper, start_vhost_sup, [VHost]}, + intrinsic, infinity, supervisor, + [rabbit_vhost_sup]}]}}. + +start_vhost_sup(VHost) -> + case rabbit_vhost_sup:start_link(VHost) of + {ok, Pid} -> + %% Save vhost sup record with wrapper pid and vhost sup pid. + ok = rabbit_vhost_sup_sup:save_vhost_sup(VHost, self(), Pid), + %% We can start recover as soon as we have vhost_sup record saved + ok = rabbit_vhost:recover(VHost), + {ok, Pid}; + Other -> + Other + end.
\ No newline at end of file diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 1039ed659c..b65536c0d4 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -161,14 +161,23 @@ msg_stores() -> all_vhosts_children(msg_store_persistent). all_vhosts_children(Name) -> - lists:filter_map( - fun({_, VHostSup, _, _}) -> - case supervisor2:find_child(VHostSup, Name) of - [QSup] -> {true, QSup}; - [] -> false - end - end, - supervisor:which_children(rabbit_vhost_sup_sup)). + case whereis(rabbit_vhost_sup_sup) of + undefined -> []; + Pid when is_pid(Pid) -> + lists:filtermap( + fun({_, VHostSupWrapper, _, _}) -> + case supervisor2:find_child(VHostSupWrapper, + rabbit_vhost_sup) of + [] -> false; + [VHostSup] -> + case supervisor2:find_child(VHostSup, Name) of + [QSup] -> {true, QSup}; + [] -> false + end + end + end, + supervisor:which_children(rabbit_vhost_sup_sup)) + end. interesting_sups0() -> MsgIndexProcs = msg_stores(), |
