summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-07-04 13:23:30 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-07-04 13:23:30 +0100
commita7badca0c6819fc255eb44474535ebcdc29d5003 (patch)
tree42104568045399a8057c3cb484e8361a68b882cd
parent9119004e6a136db4bddca9251c622436f3d87e64 (diff)
downloadrabbitmq-server-git-a7badca0c6819fc255eb44474535ebcdc29d5003.tar.gz
Use two supervisors (an overall one and the delegate underneath it) so that our behaviour is not affected by the users's choice of restart strategy.
-rw-r--r--src/mirrored_supervisor.erl86
-rw-r--r--src/rabbit_upgrade_functions.erl5
2 files changed, 49 insertions, 42 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 357c21d59e..add2c1208b 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -24,7 +24,6 @@
-define(SUPERVISOR, supervisor2).
-define(GEN_SERVER, gen_server2).
--define(ID, ?MODULE).
-define(TABLE, mirrored_sup_childspec).
-define(TABLE_DEF,
@@ -41,16 +40,17 @@
-export([behaviour_info/1]).
-behaviour(?GEN_SERVER).
+-behaviour(?SUPERVISOR).
-export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3,
handle_cast/2]).
--export([start_internal/3]).
+-export([start_internal/2]).
-export([create_tables/0, table_definitions/0]).
--record(mirrored_sup_childspec, {id, sup_pid, childspec}).
+-record(mirrored_sup_childspec, {id, mirroring_pid, childspec}).
--record(state, {sup, group}).
+-record(state, {overall, group}).
%%----------------------------------------------------------------------------
@@ -59,20 +59,16 @@ start_link(_Group, _Mod, _Args) ->
exit(mirrored_supervisors_must_be_locally_named).
start_link({local, SupName}, Group, Mod, Args) ->
- {ok, SupPid} = ?SUPERVISOR:start_link({local, SupName}, Mod, Args),
- {ok, _Me} = ?SUPERVISOR:start_child(
- SupPid, {?ID, {?MODULE, start_internal,
- [SupName, Group, Args]},
- transient, 16#ffffffff, supervisor, [?MODULE]}),
- {ok, SupPid};
+ ?SUPERVISOR:start_link({local, SupName}, ?MODULE,
+ {overall, SupName, Group, Mod, Args});
start_link({global, _SupName}, _Group, _Mod, _Args) ->
exit(mirrored_supervisors_must_be_locally_named).
start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
delete_child(Sup, Name) -> call(Sup, {delete_child, Name}).
-restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Sup, Name]}).
-terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Sup, Name]}).
+restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}).
+terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}).
which_children(Sup) -> ?SUPERVISOR:which_children(Sup).
check_childspecs(ChildSpecs) -> ?SUPERVISOR:check_childspecs(ChildSpecs).
@@ -80,7 +76,7 @@ behaviour_info(callbacks) -> [{init,1}];
behaviour_info(_Other) -> undefined.
call(Sup, Msg) ->
- ?GEN_SERVER:call(child(Sup, ?ID), Msg, infinity).
+ ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
child(Sup, Name) ->
[Pid] = [Pid || {Name1, Pid, _, _} <- which_children(Sup), Name1 =:= Name],
@@ -88,12 +84,20 @@ child(Sup, Name) ->
%%----------------------------------------------------------------------------
-start_internal(Sup, Group, Args) ->
- ?GEN_SERVER:start_link(?MODULE, {Sup, Group, Args}, [{timeout, infinity}]).
+start_internal(Sup, Group) ->
+ ?GEN_SERVER:start_link(
+ ?MODULE, {mirroring, Sup, Group}, [{timeout, infinity}]).
%%----------------------------------------------------------------------------
-init({Sup, Group, _Args}) ->
+init({overall, SupName, Group, Mod, Args}) ->
+ Delegate = {delegate, {?SUPERVISOR, start_link, [Mod, Args]},
+ transient, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ Mirroring = {mirroring, {?MODULE, start_internal, [SupName, Group]},
+ transient, 16#ffffffff, worker, [?MODULE]},
+ {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
+
+init({mirroring, Sup, Group}) ->
pg2_fixed:create(Group),
[begin
gen_server2:call(Pid, {hello, self()}, infinity),
@@ -101,26 +105,28 @@ init({Sup, Group, _Args}) ->
end
|| Pid <- pg2_fixed:get_members(Group)],
ok = pg2_fixed:join(Group, self()),
- {ok, #state{sup = Sup, group = Group}}.
+ {ok, #state{overall = Sup, group = Group}}.
-handle_call({start_child, ChildSpec}, _From, State = #state{sup = Sup}) ->
+handle_call({start_child, ChildSpec}, _From,
+ State = #state{overall = Overall}) ->
{reply, case mnesia:transaction(fun() -> check_start(ChildSpec) end) of
- {atomic, start} -> start(Sup, ChildSpec);
+ {atomic, start} -> start(Overall, ChildSpec);
{atomic, Pid} -> {ok, Pid}
end, State};
-handle_call({delete_child, Id}, _From, State = #state{sup = Sup}) ->
+handle_call({delete_child, Id}, _From,
+ State = #state{overall = Overall}) ->
{atomic, ok} = mnesia:transaction(fun() -> delete(Id) end),
- {reply, stop(Sup, Id), State};
+ {reply, stop(Overall, Id), State};
-handle_call({msg, F, A}, _From, State) ->
- {reply, apply(?SUPERVISOR, F, A), State};
+handle_call({msg, F, A}, _From, State = #state{overall = Overall}) ->
+ {reply, apply(?SUPERVISOR, F, [child(Overall, delegate) | A]), State};
handle_call({hello, Pid}, _From, State) ->
erlang:monitor(process, Pid),
{reply, ok, State};
-handle_call(supervisor, _From, State = #state{sup = Sup}) ->
+handle_call(overall_supervisor, _From, State = #state{overall = Sup}) ->
{reply, Sup, State};
handle_call(Msg, _From, State) ->
@@ -130,7 +136,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{sup = Sup, group = Group}) ->
+ State = #state{overall = Overall, group = Group}) ->
%% TODO load balance this
%% We remove the dead pid here because pg2_fixed is slightly racy,
%% most of the time it will be gone before we get here but not
@@ -139,7 +145,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
case lists:sort(pg2_fixed:get_members(Group)) -- [Pid] of
[Self | _] -> {atomic, ChildSpecs} =
mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Sup, ChildSpec) || ChildSpec <- ChildSpecs];
+ [start(Overall, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> ok
end,
{noreply, State};
@@ -159,36 +165,36 @@ check_start(ChildSpec) ->
case mnesia:wread({?TABLE, id(ChildSpec)}) of
[] -> write(ChildSpec),
start;
- [S] -> #mirrored_sup_childspec{id = Id,
- sup_pid = Pid} = S,
+ [S] -> #mirrored_sup_childspec{id = Id,
+ mirroring_pid = Pid} = S,
case supervisor(Pid) of
dead -> delete(ChildSpec),
write(ChildSpec),
start;
- Sup -> child(Sup, Id)
+ Sup -> child(child(Sup, delegate), Id)
end
end.
supervisor(Pid) ->
try
- gen_server:call(Pid, supervisor, infinity)
+ gen_server:call(Pid, overall_supervisor, infinity)
catch
exit:{noproc, _} -> dead
end.
write(ChildSpec) ->
- ok = mnesia:write(#mirrored_sup_childspec{id = id(ChildSpec),
- sup_pid = self(),
- childspec = ChildSpec}).
+ ok = mnesia:write(#mirrored_sup_childspec{id = id(ChildSpec),
+ mirroring_pid = self(),
+ childspec = ChildSpec}).
delete(Id) ->
ok = mnesia:delete({?TABLE, Id}).
-start(Sup, ChildSpec) ->
- apply(?SUPERVISOR, start_child, [Sup, ChildSpec]).
+start(Overall, ChildSpec) ->
+ apply(?SUPERVISOR, start_child, [child(Overall, delegate), ChildSpec]).
-stop(Sup, Id) ->
- apply(?SUPERVISOR, delete_child, [Sup, Id]).
+stop(Overall, Id) ->
+ apply(?SUPERVISOR, delete_child, [child(Overall, delegate), Id]).
id({Id, _, _, _, _, _}) -> Id.
@@ -198,9 +204,9 @@ update(ChildSpec) ->
ChildSpec.
update_all(OldPid) ->
- MatchHead = #mirrored_sup_childspec{sup_pid = OldPid,
- childspec = '$1',
- _ = '_'},
+ MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid,
+ childspec = '$1',
+ _ = '_'},
[update(C) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])].
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 15887af1d7..797e9fb971 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -157,8 +157,9 @@ gm() ->
{attributes, [name, version, members]}]).
mirrored_supervisor() ->
- create(mirrored_sup_childspec, [{record_name, mirrored_sup_childspec},
- {attributes, [id, sup_pid, childspec]}]).
+ create(mirrored_sup_childspec,
+ [{record_name, mirrored_sup_childspec},
+ {attributes, [id, mirroring_pid, childspec]}]).
%%--------------------------------------------------------------------