diff options
| -rw-r--r-- | src/mirrored_supervisor.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_upgrade_functions.erl | 5 |
2 files changed, 49 insertions, 42 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 357c21d59e..add2c1208b 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -24,7 +24,6 @@ -define(SUPERVISOR, supervisor2). -define(GEN_SERVER, gen_server2). --define(ID, ?MODULE). -define(TABLE, mirrored_sup_childspec). -define(TABLE_DEF, @@ -41,16 +40,17 @@ -export([behaviour_info/1]). -behaviour(?GEN_SERVER). +-behaviour(?SUPERVISOR). -export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3, handle_cast/2]). --export([start_internal/3]). +-export([start_internal/2]). -export([create_tables/0, table_definitions/0]). --record(mirrored_sup_childspec, {id, sup_pid, childspec}). +-record(mirrored_sup_childspec, {id, mirroring_pid, childspec}). --record(state, {sup, group}). +-record(state, {overall, group}). %%---------------------------------------------------------------------------- @@ -59,20 +59,16 @@ start_link(_Group, _Mod, _Args) -> exit(mirrored_supervisors_must_be_locally_named). start_link({local, SupName}, Group, Mod, Args) -> - {ok, SupPid} = ?SUPERVISOR:start_link({local, SupName}, Mod, Args), - {ok, _Me} = ?SUPERVISOR:start_child( - SupPid, {?ID, {?MODULE, start_internal, - [SupName, Group, Args]}, - transient, 16#ffffffff, supervisor, [?MODULE]}), - {ok, SupPid}; + ?SUPERVISOR:start_link({local, SupName}, ?MODULE, + {overall, SupName, Group, Mod, Args}); start_link({global, _SupName}, _Group, _Mod, _Args) -> exit(mirrored_supervisors_must_be_locally_named). start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). delete_child(Sup, Name) -> call(Sup, {delete_child, Name}). -restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Sup, Name]}). -terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Sup, Name]}). +restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}). +terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}). which_children(Sup) -> ?SUPERVISOR:which_children(Sup). check_childspecs(ChildSpecs) -> ?SUPERVISOR:check_childspecs(ChildSpecs). @@ -80,7 +76,7 @@ behaviour_info(callbacks) -> [{init,1}]; behaviour_info(_Other) -> undefined. call(Sup, Msg) -> - ?GEN_SERVER:call(child(Sup, ?ID), Msg, infinity). + ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). child(Sup, Name) -> [Pid] = [Pid || {Name1, Pid, _, _} <- which_children(Sup), Name1 =:= Name], @@ -88,12 +84,20 @@ child(Sup, Name) -> %%---------------------------------------------------------------------------- -start_internal(Sup, Group, Args) -> - ?GEN_SERVER:start_link(?MODULE, {Sup, Group, Args}, [{timeout, infinity}]). +start_internal(Sup, Group) -> + ?GEN_SERVER:start_link( + ?MODULE, {mirroring, Sup, Group}, [{timeout, infinity}]). %%---------------------------------------------------------------------------- -init({Sup, Group, _Args}) -> +init({overall, SupName, Group, Mod, Args}) -> + Delegate = {delegate, {?SUPERVISOR, start_link, [Mod, Args]}, + transient, 16#ffffffff, supervisor, [?SUPERVISOR]}, + Mirroring = {mirroring, {?MODULE, start_internal, [SupName, Group]}, + transient, 16#ffffffff, worker, [?MODULE]}, + {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; + +init({mirroring, Sup, Group}) -> pg2_fixed:create(Group), [begin gen_server2:call(Pid, {hello, self()}, infinity), @@ -101,26 +105,28 @@ init({Sup, Group, _Args}) -> end || Pid <- pg2_fixed:get_members(Group)], ok = pg2_fixed:join(Group, self()), - {ok, #state{sup = Sup, group = Group}}. + {ok, #state{overall = Sup, group = Group}}. -handle_call({start_child, ChildSpec}, _From, State = #state{sup = Sup}) -> +handle_call({start_child, ChildSpec}, _From, + State = #state{overall = Overall}) -> {reply, case mnesia:transaction(fun() -> check_start(ChildSpec) end) of - {atomic, start} -> start(Sup, ChildSpec); + {atomic, start} -> start(Overall, ChildSpec); {atomic, Pid} -> {ok, Pid} end, State}; -handle_call({delete_child, Id}, _From, State = #state{sup = Sup}) -> +handle_call({delete_child, Id}, _From, + State = #state{overall = Overall}) -> {atomic, ok} = mnesia:transaction(fun() -> delete(Id) end), - {reply, stop(Sup, Id), State}; + {reply, stop(Overall, Id), State}; -handle_call({msg, F, A}, _From, State) -> - {reply, apply(?SUPERVISOR, F, A), State}; +handle_call({msg, F, A}, _From, State = #state{overall = Overall}) -> + {reply, apply(?SUPERVISOR, F, [child(Overall, delegate) | A]), State}; handle_call({hello, Pid}, _From, State) -> erlang:monitor(process, Pid), {reply, ok, State}; -handle_call(supervisor, _From, State = #state{sup = Sup}) -> +handle_call(overall_supervisor, _From, State = #state{overall = Sup}) -> {reply, Sup, State}; handle_call(Msg, _From, State) -> @@ -130,7 +136,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{sup = Sup, group = Group}) -> + State = #state{overall = Overall, group = Group}) -> %% TODO load balance this %% We remove the dead pid here because pg2_fixed is slightly racy, %% most of the time it will be gone before we get here but not @@ -139,7 +145,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, case lists:sort(pg2_fixed:get_members(Group)) -- [Pid] of [Self | _] -> {atomic, ChildSpecs} = mnesia:transaction(fun() -> update_all(Pid) end), - [start(Sup, ChildSpec) || ChildSpec <- ChildSpecs]; + [start(Overall, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> ok end, {noreply, State}; @@ -159,36 +165,36 @@ check_start(ChildSpec) -> case mnesia:wread({?TABLE, id(ChildSpec)}) of [] -> write(ChildSpec), start; - [S] -> #mirrored_sup_childspec{id = Id, - sup_pid = Pid} = S, + [S] -> #mirrored_sup_childspec{id = Id, + mirroring_pid = Pid} = S, case supervisor(Pid) of dead -> delete(ChildSpec), write(ChildSpec), start; - Sup -> child(Sup, Id) + Sup -> child(child(Sup, delegate), Id) end end. supervisor(Pid) -> try - gen_server:call(Pid, supervisor, infinity) + gen_server:call(Pid, overall_supervisor, infinity) catch exit:{noproc, _} -> dead end. write(ChildSpec) -> - ok = mnesia:write(#mirrored_sup_childspec{id = id(ChildSpec), - sup_pid = self(), - childspec = ChildSpec}). + ok = mnesia:write(#mirrored_sup_childspec{id = id(ChildSpec), + mirroring_pid = self(), + childspec = ChildSpec}). delete(Id) -> ok = mnesia:delete({?TABLE, Id}). -start(Sup, ChildSpec) -> - apply(?SUPERVISOR, start_child, [Sup, ChildSpec]). +start(Overall, ChildSpec) -> + apply(?SUPERVISOR, start_child, [child(Overall, delegate), ChildSpec]). -stop(Sup, Id) -> - apply(?SUPERVISOR, delete_child, [Sup, Id]). +stop(Overall, Id) -> + apply(?SUPERVISOR, delete_child, [child(Overall, delegate), Id]). id({Id, _, _, _, _, _}) -> Id. @@ -198,9 +204,9 @@ update(ChildSpec) -> ChildSpec. update_all(OldPid) -> - MatchHead = #mirrored_sup_childspec{sup_pid = OldPid, - childspec = '$1', - _ = '_'}, + MatchHead = #mirrored_sup_childspec{mirroring_pid = OldPid, + childspec = '$1', + _ = '_'}, [update(C) || C <- mnesia:select(?TABLE, [{MatchHead, [], ['$1']}])]. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 15887af1d7..797e9fb971 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -157,8 +157,9 @@ gm() -> {attributes, [name, version, members]}]). mirrored_supervisor() -> - create(mirrored_sup_childspec, [{record_name, mirrored_sup_childspec}, - {attributes, [id, sup_pid, childspec]}]). + create(mirrored_sup_childspec, + [{record_name, mirrored_sup_childspec}, + {attributes, [id, mirroring_pid, childspec]}]). %%-------------------------------------------------------------------- |
