summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-07-14 12:35:34 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-07-14 12:35:34 +0100
commite3c9470ef13e0a82bd6853461aa86d8e42c34a6e (patch)
tree0711ef01de17a37996e561aca98d31d517e7d5ef
parent1639445b0cb3a53944261b0dd0bf9d71b0ac1106 (diff)
downloadrabbitmq-server-git-e3c9470ef13e0a82bd6853461aa86d8e42c34a6e.tar.gz
Shut down the whole supervisor group if the delegate shuts down due to its children having surpassed their restart limits.
-rw-r--r--src/mirrored_supervisor.erl116
-rw-r--r--src/mirrored_supervisor_tests.erl50
2 files changed, 114 insertions, 52 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 1e84bf0a12..4af4709f0f 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -127,7 +127,11 @@
-record(mirrored_sup_childspec, {id, mirroring_pid, childspec}).
--record(state, {overall, group, initial_childspecs}).
+-record(state, {overall,
+ delegate,
+ group,
+ initial_childspecs,
+ peer_monitors = sets:new()}).
%%----------------------------------------------------------------------------
@@ -241,12 +245,12 @@ start_link0(Prefix, Group, Mod, Args) ->
Other -> Other
end.
-start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
-delete_child(Sup, Name) -> call(Sup, {delete_child, Name}).
-restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}).
-terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}).
-which_children(Sup) -> ?SUPERVISOR:which_children(Sup).
-check_childspecs(ChildSpecs) -> ?SUPERVISOR:check_childspecs(ChildSpecs).
+start_child(Sup, ChildSpec) -> call(Sup, {start_child, ChildSpec}).
+delete_child(Sup, Name) -> call(Sup, {delete_child, Name}).
+restart_child(Sup, Name) -> call(Sup, {msg, restart_child, [Name]}).
+terminate_child(Sup, Name) -> call(Sup, {msg, terminate_child, [Name]}).
+which_children(Sup) -> ?SUPERVISOR:which_children(child(Sup, delegate)).
+check_childspecs(Specs) -> ?SUPERVISOR:check_childspecs(Specs).
behaviour_info(callbacks) -> [{init,1}];
behaviour_info(_Other) -> undefined.
@@ -255,7 +259,8 @@ call(Sup, Msg) ->
?GEN_SERVER:call(child(Sup, mirroring), Msg, infinity).
child(Sup, Name) ->
- [Pid] = [Pid || {Name1, Pid, _, _} <- which_children(Sup), Name1 =:= Name],
+ [Pid] = [Pid || {Name1, Pid, _, _} <- ?SUPERVISOR:which_children(Sup),
+ Name1 =:= Name],
Pid.
%%----------------------------------------------------------------------------
@@ -270,57 +275,80 @@ init({overall, Group, Mod, Args}) ->
{ok, {Restart, ChildSpecs}} = Mod:init(Args),
Delegate = {delegate, {?SUPERVISOR, start_link,
[?MODULE, {delegate, Restart}]},
- transient, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
Mirroring = {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
- transient, 16#ffffffff, worker, [?MODULE]},
- {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
+ permanent, 16#ffffffff, worker, [?MODULE]},
+ %% Important: Delegate MUST start after Mirroring, see comment in
+ %% handle_info('DOWN', ...) below
+ {ok, {{one_for_all, 0, 1}, [Mirroring, Delegate]}};
init({delegate, Restart}) ->
{ok, {Restart, []}};
init({mirroring, Group, ChildSpecs}) ->
+ process_flag(trap_exit, true),
?PG2:create(Group),
ok = ?PG2:join(Group, self()),
- [begin
- gen_server2:cast(Pid, {ensure_monitoring, self()}),
- erlang:monitor(process, Pid)
- end
- || Pid <- ?PG2:get_members(Group) -- [self()]],
- {ok, #state{group = Group, initial_childspecs = ChildSpecs}}.
+ {ok, lists:foldl(
+ fun(Pid, State0) ->
+ gen_server2:cast(Pid, {ensure_monitoring, self()}),
+ monitor(Pid, State0)
+ end, #state{group = Group, initial_childspecs = ChildSpecs},
+ ?PG2:get_members(Group) -- [self()])}.
handle_call({finish_startup, Overall}, _From,
State = #state{overall = undefined,
initial_childspecs = ChildSpecs}) ->
- [maybe_start(Overall, S) || S <- ChildSpecs],
- {reply, ok, State#state{overall = Overall}};
+ Delegate = child(Overall, delegate),
+ erlang:monitor(process, Delegate),
+ [maybe_start(Delegate, S) || S <- ChildSpecs],
+ {reply, ok, State#state{overall = Overall, delegate = Delegate}};
handle_call({start_child, ChildSpec}, _From,
- State = #state{overall = Overall}) ->
- {reply, maybe_start(Overall, ChildSpec), State};
+ State = #state{delegate = Delegate}) ->
+ {reply, maybe_start(Delegate, ChildSpec), State};
handle_call({delete_child, Id}, _From,
- State = #state{overall = Overall}) ->
+ State = #state{delegate = Delegate}) ->
{atomic, ok} = mnesia:transaction(fun() -> delete(Id) end),
- {reply, stop(Overall, Id), State};
+ {reply, stop(Delegate, Id), State};
-handle_call({msg, F, A}, _From, State = #state{overall = Overall}) ->
- {reply, apply(?SUPERVISOR, F, [child(Overall, delegate) | A]), State};
+handle_call({msg, F, A}, _From, State = #state{delegate = Delegate}) ->
+ {reply, apply(?SUPERVISOR, F, [Delegate | A]), State};
-handle_call(overall_supervisor, _From, State = #state{overall = Overall}) ->
- {reply, Overall, State};
+handle_call(delegate_supervisor, _From, State = #state{delegate = Delegate}) ->
+ {reply, Delegate, State};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
handle_cast({ensure_monitoring, Pid}, State) ->
- erlang:monitor(process, Pid),
- {noreply, State};
+ {noreply, monitor(Pid, State)};
+
+handle_cast({die, Reason}, State = #state{peer_monitors = Peers}) ->
+ [erlang:demonitor(Ref) || Ref <- sets:to_list(Peers)],
+ {stop, Reason, State};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _Ref, process, Pid, Reason},
+ State = #state{delegate = Pid, group = Group}) ->
+ %% Since the delegate is temporary, its death won't cause us to
+ %% die. Since the overall supervisor kills processes in reverse
+ %% order when shutting down "from above" and we started after the
+ %% delegate, if we see the delegate die then that means it died
+ %% because one of its children exceeded its restart limits, not
+ %% because the whole app was being torn down.
+ %%
+ %% Therefore if we get here we know we need to cause the entire
+ %% mirrored sup to shut down, not just fail over.
+ %% TODO is this itself racy?
+ [gen_server2:cast(P, {die, Reason}) || P <- ?PG2:get_members(Group)],
+ {noreply, State};
+
handle_info({'DOWN', _Ref, process, Pid, _Reason},
- State = #state{overall = Overall, group = Group}) ->
+ State = #state{delegate = Delegate, group = Group}) ->
%% TODO load balance this
%% We remove the dead pid here because pg2 is slightly racy,
%% most of the time it will be gone before we get here but not
@@ -329,7 +357,7 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
case lists:sort(?PG2:get_members(Group)) -- [Pid] of
[Self | _] -> {atomic, ChildSpecs} =
mnesia:transaction(fun() -> update_all(Pid) end),
- [start(Overall, ChildSpec) || ChildSpec <- ChildSpecs];
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
_ -> ok
end,
{noreply, State};
@@ -345,9 +373,13 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-maybe_start(Overall, ChildSpec) ->
+monitor(Pid, State = #state{peer_monitors = Peers}) ->
+ State#state{peer_monitors = sets:add_element(
+ erlang:monitor(process, Pid), Peers)}.
+
+maybe_start(Delegate, ChildSpec) ->
case mnesia:transaction(fun() -> check_start(ChildSpec) end) of
- {atomic, start} -> start(Overall, ChildSpec);
+ {atomic, start} -> start(Delegate, ChildSpec);
{atomic, Pid} -> {ok, Pid}
end.
@@ -358,16 +390,16 @@ check_start(ChildSpec) ->
[S] -> #mirrored_sup_childspec{id = Id,
mirroring_pid = Pid} = S,
case supervisor(Pid) of
- dead -> delete(ChildSpec),
- write(ChildSpec),
- start;
- Sup -> child(child(Sup, delegate), Id)
+ dead -> delete(ChildSpec),
+ write(ChildSpec),
+ start;
+ Delegate -> child(Delegate, Id)
end
end.
supervisor(Pid) ->
try
- gen_server:call(Pid, overall_supervisor, infinity)
+ gen_server:call(Pid, delegate_supervisor, infinity)
catch
exit:{noproc, _} -> dead
end.
@@ -380,11 +412,11 @@ write(ChildSpec) ->
delete(Id) ->
ok = mnesia:delete({?TABLE, Id}).
-start(Overall, ChildSpec) ->
- apply(?SUPERVISOR, start_child, [child(Overall, delegate), ChildSpec]).
+start(Delegate, ChildSpec) ->
+ apply(?SUPERVISOR, start_child, [Delegate, ChildSpec]).
-stop(Overall, Id) ->
- apply(?SUPERVISOR, delete_child, [child(Overall, delegate), Id]).
+stop(Delegate, Id) ->
+ apply(?SUPERVISOR, delete_child, [Delegate, Id]).
id({Id, _, _, _, _, _}) -> Id.
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 00a6d690e1..51d3ea0d24 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -36,6 +36,7 @@ all_tests() ->
passed = test_large_group(),
passed = test_childspecs_at_init(),
passed = test_anonymous_supervisors(),
+ passed = test_no_migration_on_shutdown(),
passed.
%% Simplest test
@@ -113,6 +114,24 @@ test_anonymous_supervisors() ->
false = (Pid1 =:= Pid2)
end, [anon, anon]).
+%% When a mirrored_supervisor terminates, we should not migrate, but
+%% the whole supervisor group should shut down. To test this we set up
+%% a situation where the gen_server will only fail if it's running
+%% under the supervisor called 'evil'. It should not migrate to
+%% 'good' and survive, rather the whole group should go away.
+test_no_migration_on_shutdown() ->
+ with_sups(fun([Evil, _]) ->
+ mirrored_supervisor:start_child(Evil, childspec(worker)),
+ try
+ call(worker, ping),
+ exit(worker_should_not_have_migrated)
+ catch exit:{timeout_waiting_for_server, _} ->
+ ok
+ end,
+ timer:sleep(1000)
+ end, [evil, good]).
+
+
%% ---------------------------------------------------------------------------
with_sups(Fun, Sups) ->
@@ -135,16 +154,18 @@ start_sup(Name, Group) ->
start_sup({Name, []}, Group).
start_sup0(anon, Group, ChildSpecs) ->
- mirrored_supervisor:start_link(Group, ?MODULE, ChildSpecs);
+ mirrored_supervisor:start_link(Group, ?MODULE, {sup, ChildSpecs});
start_sup0(Name, Group, ChildSpecs) ->
- mirrored_supervisor:start_link({local, Name}, Group, ?MODULE, ChildSpecs).
+ mirrored_supervisor:start_link({local, Name},
+ Group, ?MODULE, {sup, ChildSpecs}).
childspec(Id) ->
- {Id, {?MODULE, start_gs, [Id]}, transient, 16#ffffffff, worker, [?MODULE]}.
+ {Id, {?MODULE, start_gs, [Id]},
+ transient, 16#ffffffff, worker, [?MODULE]}.
start_gs(Id) ->
- gen_server:start_link({local, Id}, ?MODULE, [], []).
+ gen_server:start_link({local, Id}, ?MODULE, server, []).
pid_of(Id) ->
{received, Pid, ping} = call(Id, ping),
@@ -153,7 +174,7 @@ pid_of(Id) ->
call(Id, Msg) -> call(Id, Msg, 100, 10).
call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, Id, Msg});
+ exit({timeout_waiting_for_server, {Id, Msg}});
call(Id, Msg, MaxDelay, Decr) ->
try
@@ -170,13 +191,14 @@ kill(Pid) ->
%% Dumb gen_server we can supervise
%% ---------------------------------------------------------------------------
-%% Cheeky: this is used for both the gen_server and supervisor
-%% behaviours. So our gen server has a weird-looking state. But we
-%% don't care.
-init(ChildSpecs) ->
- {ok, {{one_for_one, 3, 10}, ChildSpecs}}.
+init({sup, ChildSpecs}) ->
+ {ok, {{one_for_one, 0, 1}, ChildSpecs}};
+
+init(server) ->
+ {ok, state}.
handle_call(Msg, _From, State) ->
+ die_if_my_supervisor_is_evil(),
{reply, {received, self(), Msg}, State}.
handle_cast(_Msg, State) ->
@@ -190,3 +212,11 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+
+die_if_my_supervisor_is_evil() ->
+ try lists:keyfind(self(), 2, mirrored_supervisor:which_children(evil)) of
+ false -> ok;
+ _ -> exit(doooom)
+ catch
+ exit:{noproc, _} -> ok
+ end.