diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2011-07-14 12:35:34 +0100 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2011-07-14 12:35:34 +0100 |
| commit | e3c9470ef13e0a82bd6853461aa86d8e42c34a6e (patch) | |
| tree | 0711ef01de17a37996e561aca98d31d517e7d5ef | |
| parent | 1639445b0cb3a53944261b0dd0bf9d71b0ac1106 (diff) | |
| download | rabbitmq-server-git-e3c9470ef13e0a82bd6853461aa86d8e42c34a6e.tar.gz | |
Shut down the whole supervisor group if the delegate shuts down due to its children having surpassed their restart limits.
| -rw-r--r-- | src/mirrored_supervisor.erl | 116 | ||||
| -rw-r--r-- | src/mirrored_supervisor_tests.erl | 50 |
2 files changed, 114 insertions, 52 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 1e84bf0a12..4af4709f0f 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -127,7 +127,11 @@ -record(mirrored_sup_childspec, {id, mirroring_pid, childspec}). --record(state, {overall, group, initial_childspecs}). +-record(state, {overall, + delegate, + group, + initial_childspecs, + peer_monitors = sets:new()}). %%---------------------------------------------------------------------------- @@ -241,12 +245,12 @@ start_link0(Prefix, Group, Mod, Args) -> Other -> Other end. -start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). -delete_child(Sup, Name) -> call(Sup, {delete_child, Name}). -restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}). -terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}). -which_children(Sup) -> ?SUPERVISOR:which_children(Sup). -check_childspecs(ChildSpecs) -> ?SUPERVISOR:check_childspecs(ChildSpecs). +start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}). +delete_child(Sup, Name) -> call(Sup, {delete_child, Name}). +restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}). +terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}). +which_children(Sup) -> ?SUPERVISOR:which_children(child(Sup, delegate)). +check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs). behaviour_info(callbacks) -> [{init,1}]; behaviour_info(_Other) -> undefined. @@ -255,7 +259,8 @@ call(Sup, Msg) -> ?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity). child(Sup, Name) -> - [Pid] = [Pid || {Name1, Pid, _, _} <- which_children(Sup), Name1 =:= Name], + [Pid] = [Pid || {Name1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup), + Name1 =:= Name], Pid. %%---------------------------------------------------------------------------- @@ -270,57 +275,80 @@ init({overall, Group, Mod, Args}) -> {ok, {Restart, ChildSpecs}} = Mod:init(Args), Delegate = {delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, - transient, 16#ffffffff, supervisor, [?SUPERVISOR]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, Mirroring = {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, - transient, 16#ffffffff, worker, [?MODULE]}, - {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; + permanent, 16#ffffffff, worker, [?MODULE]}, + %% Important: Delegate MUST start after Mirroring, see comment in + %% handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, [Mirroring, Delegate]}}; init({delegate, Restart}) -> {ok, {Restart, []}}; init({mirroring, Group, ChildSpecs}) -> + process_flag(trap_exit, true), ?PG2:create(Group), ok = ?PG2:join(Group, self()), - [begin - gen_server2:cast(Pid, {ensure_monitoring, self()}), - erlang:monitor(process, Pid) - end - || Pid <- ?PG2:get_members(Group) -- [self()]], - {ok, #state{group = Group, initial_childspecs = ChildSpecs}}. + {ok, lists:foldl( + fun(Pid, State0) -> + gen_server2:cast(Pid, {ensure_monitoring, self()}), + monitor(Pid, State0) + end, #state{group = Group, initial_childspecs = ChildSpecs}, + ?PG2:get_members(Group) -- [self()])}. handle_call({finish_startup, Overall}, _From, State = #state{overall = undefined, initial_childspecs = ChildSpecs}) -> - [maybe_start(Overall, S) || S <- ChildSpecs], - {reply, ok, State#state{overall = Overall}}; + Delegate = child(Overall, delegate), + erlang:monitor(process, Delegate), + [maybe_start(Delegate, S) || S <- ChildSpecs], + {reply, ok, State#state{overall = Overall, delegate = Delegate}}; handle_call({start_child, ChildSpec}, _From, - State = #state{overall = Overall}) -> - {reply, maybe_start(Overall, ChildSpec), State}; + State = #state{delegate = Delegate}) -> + {reply, maybe_start(Delegate, ChildSpec), State}; handle_call({delete_child, Id}, _From, - State = #state{overall = Overall}) -> + State = #state{delegate = Delegate}) -> {atomic, ok} = mnesia:transaction(fun() -> delete(Id) end), - {reply, stop(Overall, Id), State}; + {reply, stop(Delegate, Id), State}; -handle_call({msg, F, A}, _From, State = #state{overall = Overall}) -> - {reply, apply(?SUPERVISOR, F, [child(Overall, delegate) | A]), State}; +handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) -> + {reply, apply(?SUPERVISOR, F, [Delegate | A]), State}; -handle_call(overall_supervisor, _From, State = #state{overall = Overall}) -> - {reply, Overall, State}; +handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) -> + {reply, Delegate, State}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. handle_cast({ensure_monitoring, Pid}, State) -> - erlang:monitor(process, Pid), - {noreply, State}; + {noreply, monitor(Pid, State)}; + +handle_cast({die, Reason}, State = #state{peer_monitors = Peers}) -> + [erlang:demonitor(Ref) || Ref <- sets:to_list(Peers)], + {stop, Reason, State}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', _Ref, process, Pid, Reason}, + State = #state{delegate = Pid, group = Group}) -> + %% Since the delegate is temporary, its death won't cause us to + %% die. Since the overall supervisor kills processes in reverse + %% order when shutting down "from above" and we started after the + %% delegate, if we see the delegate die then that means it died + %% because one of its children exceeded its restart limits, not + %% because the whole app was being torn down. + %% + %% Therefore if we get here we know we need to cause the entire + %% mirrored sup to shut down, not just fail over. + %% TODO is this itself racy? + [gen_server2:cast(P, {die, Reason}) || P <- ?PG2:get_members(Group)], + {noreply, State}; + handle_info({'DOWN', _Ref, process, Pid, _Reason}, - State = #state{overall = Overall, group = Group}) -> + State = #state{delegate = Delegate, group = Group}) -> %% TODO load balance this %% We remove the dead pid here because pg2 is slightly racy, %% most of the time it will be gone before we get here but not @@ -329,7 +357,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, case lists:sort(?PG2:get_members(Group)) -- [Pid] of [Self | _] -> {atomic, ChildSpecs} = mnesia:transaction(fun() -> update_all(Pid) end), - [start(Overall, ChildSpec) || ChildSpec <- ChildSpecs]; + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; _ -> ok end, {noreply, State}; @@ -345,9 +373,13 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -maybe_start(Overall, ChildSpec) -> +monitor(Pid, State = #state{peer_monitors = Peers}) -> + State#state{peer_monitors = sets:add_element( + erlang:monitor(process, Pid), Peers)}. + +maybe_start(Delegate, ChildSpec) -> case mnesia:transaction(fun() -> check_start(ChildSpec) end) of - {atomic, start} -> start(Overall, ChildSpec); + {atomic, start} -> start(Delegate, ChildSpec); {atomic, Pid} -> {ok, Pid} end. @@ -358,16 +390,16 @@ check_start(ChildSpec) -> [S] -> #mirrored_sup_childspec{id = Id, mirroring_pid = Pid} = S, case supervisor(Pid) of - dead -> delete(ChildSpec), - write(ChildSpec), - start; - Sup -> child(child(Sup, delegate), Id) + dead -> delete(ChildSpec), + write(ChildSpec), + start; + Delegate -> child(Delegate, Id) end end. supervisor(Pid) -> try - gen_server:call(Pid, overall_supervisor, infinity) + gen_server:call(Pid, delegate_supervisor, infinity) catch exit:{noproc, _} -> dead end. @@ -380,11 +412,11 @@ write(ChildSpec) -> delete(Id) -> ok = mnesia:delete({?TABLE, Id}). -start(Overall, ChildSpec) -> - apply(?SUPERVISOR, start_child, [child(Overall, delegate), ChildSpec]). +start(Delegate, ChildSpec) -> + apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]). -stop(Overall, Id) -> - apply(?SUPERVISOR, delete_child, [child(Overall, delegate), Id]). +stop(Delegate, Id) -> + apply(?SUPERVISOR, delete_child, [Delegate, Id]). id({Id, _, _, _, _, _}) -> Id. diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 00a6d690e1..51d3ea0d24 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -36,6 +36,7 @@ all_tests() -> passed = test_large_group(), passed = test_childspecs_at_init(), passed = test_anonymous_supervisors(), + passed = test_no_migration_on_shutdown(), passed. %% Simplest test @@ -113,6 +114,24 @@ test_anonymous_supervisors() -> false = (Pid1 =:= Pid2) end, [anon, anon]). +%% When a mirrored_supervisor terminates, we should not migrate, but +%% the whole supervisor group should shut down. To test this we set up +%% a situation where the gen_server will only fail if it's running +%% under the supervisor called 'evil'. It should not migrate to +%% 'good' and survive, rather the whole group should go away. +test_no_migration_on_shutdown() -> + with_sups(fun([Evil, _]) -> + mirrored_supervisor:start_child(Evil, childspec(worker)), + try + call(worker, ping), + exit(worker_should_not_have_migrated) + catch exit:{timeout_waiting_for_server, _} -> + ok + end, + timer:sleep(1000) + end, [evil, good]). + + %% --------------------------------------------------------------------------- with_sups(Fun, Sups) -> @@ -135,16 +154,18 @@ start_sup(Name, Group) -> start_sup({Name, []}, Group). start_sup0(anon, Group, ChildSpecs) -> - mirrored_supervisor:start_link(Group, ?MODULE, ChildSpecs); + mirrored_supervisor:start_link(Group, ?MODULE, {sup, ChildSpecs}); start_sup0(Name, Group, ChildSpecs) -> - mirrored_supervisor:start_link({local, Name}, Group, ?MODULE, ChildSpecs). + mirrored_supervisor:start_link({local, Name}, + Group, ?MODULE, {sup, ChildSpecs}). childspec(Id) -> - {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}. + {Id, {?MODULE, start_gs, [Id]}, + transient, 16#ffffffff, worker, [?MODULE]}. start_gs(Id) -> - gen_server:start_link({local, Id}, ?MODULE, [], []). + gen_server:start_link({local, Id}, ?MODULE, server, []). pid_of(Id) -> {received, Pid, ping} = call(Id, ping), @@ -153,7 +174,7 @@ pid_of(Id) -> call(Id, Msg) -> call(Id, Msg, 100, 10). call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, Id, Msg}); + exit({timeout_waiting_for_server, {Id, Msg}}); call(Id, Msg, MaxDelay, Decr) -> try @@ -170,13 +191,14 @@ kill(Pid) -> %% Dumb gen_server we can supervise %% --------------------------------------------------------------------------- -%% Cheeky: this is used for both the gen_server and supervisor -%% behaviours. So our gen server has a weird-looking state. But we -%% don't care. -init(ChildSpecs) -> - {ok, {{one_for_one, 3, 10}, ChildSpecs}}. +init({sup, ChildSpecs}) -> + {ok, {{one_for_one, 0, 1}, ChildSpecs}}; + +init(server) -> + {ok, state}. handle_call(Msg, _From, State) -> + die_if_my_supervisor_is_evil(), {reply, {received, self(), Msg}, State}. handle_cast(_Msg, State) -> @@ -190,3 +212,11 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. + +die_if_my_supervisor_is_evil() -> + try lists:keyfind(self(), 2, mirrored_supervisor:which_children(evil)) of + false -> ok; + _ -> exit(doooom) + catch + exit:{noproc, _} -> ok + end. |
