summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <watson.timothy@gmail.com>2012-10-26 14:23:33 +0100
committerTim Watson <watson.timothy@gmail.com>2012-10-26 14:23:33 +0100
commit89713129dc241d41ab80bf34ce55133ed424cbc7 (patch)
treef8fedf149674715b86e479fee14354658ebb4fe0
parent4f074a9e5d57d5f3af696983665a812aecc8aaca (diff)
downloadrabbitmq-server-git-89713129dc241d41ab80bf34ce55133ed424cbc7.tar.gz
Work in progress. Lots of (reverse) costmetic changes, pull over
count_children/1 and move things around a bit. The OTP style of dynamic child handling hasn't been merged yet, so we're not even compiling yet.
-rw-r--r--src/supervisor2.erl294
1 files changed, 181 insertions, 113 deletions
diff --git a/src/supervisor2.erl b/src/supervisor2.erl
index 2252c4b59b..eae2f29886 100644
--- a/src/supervisor2.erl
+++ b/src/supervisor2.erl
@@ -55,7 +55,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2009. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2012. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -75,15 +75,15 @@
-behaviour(gen_server).
%% External exports
--export([start_link/2,start_link/3,
+-export([start_link/2, start_link/3,
start_child/2, restart_child/2,
delete_child/2, terminate_child/2,
- which_children/1, find_child/2,
- check_childspecs/1]).
+ which_children/1, count_children/1,
+ find_child/2, check_childspecs/1]).
%% Internal exports
--export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]).
--export([handle_cast/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
%%--------------------------------------------------------------------------
@@ -96,9 +96,7 @@
-type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}.
-type modules() :: [module()] | 'dynamic'.
-type delay() :: non_neg_integer().
--type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic'
- | {'permanent', delay()} | {'transient', delay()}
- | {'intrinsic', delay()}.
+-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}.
-type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
@@ -114,69 +112,69 @@
Modules :: modules()}.
-type strategy() :: 'one_for_all' | 'one_for_one'
- | 'rest_for_one' | 'simple_one_for_one'
- | 'simple_one_for_one_terminate'.
+ | 'rest_for_one' | 'simple_one_for_one' | 'simple_one_for_one_terminate'.
%%--------------------------------------------------------------------------
-record(child, {% pid is undefined when child is not running
- pid = undefined,
- name,
- mfa,
- restart_type,
- shutdown,
- child_type,
- modules = []}).
--type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()],
- name :: child_id(),
- mfa :: mfargs(),
- restart_type :: restart(),
- shutdown :: shutdown(),
- child_type :: worker(),
- modules :: modules()}.
+ pid = undefined :: child() | {restarting,pid()} | [pid()],
+ name :: child_id(),
+ mfa :: mfargs(),
+ restart_type :: restart(),
+ shutdown :: shutdown(),
+ child_type :: worker(),
+ modules = [] :: modules()}).
+-type child_rec() :: #child{}.
-define(DICT, dict).
+-define(SETS, sets).
+-define(SET, set).
-record(state, {name,
- strategy,
- children = [],
- dynamics = ?DICT:new(),
- intensity,
- period,
+ strategy :: strategy(),
+ children = [] :: [child_rec()],
+ dynamics = ?DICT:new() :: ?DICT(),
+ intensity :: non_neg_integer(),
+ period :: pos_integer(),
restarts = [],
module,
args}).
--type state() :: #state{strategy :: strategy(),
- children :: [child_rec()],
- dynamics :: ?DICT(),
- intensity :: non_neg_integer(),
- period :: pos_integer()}.
+-type state() :: #state{}.
--define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse
- State#state.strategy =:= simple_one_for_one_terminate).
-
--define(is_terminate_simple(State),
- State#state.strategy =:= simple_one_for_one_terminate).
+-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse State#state.strategy =:= simple_one_for_one_terminate).
+-define(is_terminate_simple(State), State#state.strategy =:= simple_one_for_one_terminate).
-callback init(Args :: term()) ->
{ok, {{RestartStrategy :: strategy(),
- MaxR :: non_neg_integer(),
- MaxT :: non_neg_integer()},
+ MaxR :: non_neg_integer(),
+ MaxT :: non_neg_integer()},
[ChildSpec :: child_spec()]}}
| ignore.
+-define(restarting(_Pid_), {restarting,_Pid_}).
+
%%% ---------------------------------------------------
%%% This is a general process supervisor built upon gen_server.erl.
%%% Servers/processes should/could also be built using gen_server.erl.
%%% SupName = {local, atom()} | {global, atom()}.
%%% ---------------------------------------------------
-start_link(Mod, Args) ->
- gen_server:start_link(?MODULE, {self, Mod, Args}, []).
+-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
+-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
+-spec start_link(Module, Args) -> startlink_ret() when
+ Module :: module(),
+ Args :: term().
+start_link(Mod, Args) ->
+ gen_server:start_link(supervisor, {self, Mod, Args}, []).
+
+-spec start_link(SupName, Module, Args) -> startlink_ret() when
+ SupName :: sup_name(),
+ Module :: module(),
+ Args :: term().
start_link(SupName, Mod, Args) ->
gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []).
-
+
%%% ---------------------------------------------------
%%% Interface functions.
%%% ---------------------------------------------------
@@ -220,7 +218,6 @@ delete_child(Supervisor, Name) ->
-spec terminate_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
-
Id :: pid() | child_id(),
Result :: 'ok' | {'error', Error},
Error :: 'not_found' | 'simple_one_for_one'.
@@ -229,20 +226,29 @@ terminate_child(Supervisor, Name) ->
-spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when
SupRef :: sup_ref(),
- Id :: child_id() | 'undefined',
+ Id :: child_id() | undefined,
Child :: child(),
Type :: worker(),
Modules :: modules().
which_children(Supervisor) ->
call(Supervisor, which_children).
+-spec count_children(SupRef) -> PropListOfCounts when
+ SupRef :: sup_ref(),
+ PropListOfCounts :: [Count],
+ Count :: {specs, ChildSpecCount :: non_neg_integer()}
+ | {active, ActiveProcessCount :: non_neg_integer()}
+ | {supervisors, ChildSupervisorCount :: non_neg_integer()}
+ |{workers, ChildWorkerCount :: non_neg_integer()}.
+count_children(Supervisor) ->
+ call(Supervisor, count_children).
+
call(Supervisor, Req) ->
gen_server:call(Supervisor, Req, infinity).
-spec check_childspecs(ChildSpecs) -> Result when
ChildSpecs :: [child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-
check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
case check_startspec(ChildSpecs) of
{ok, _} -> ok;
@@ -250,21 +256,22 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) ->
end;
check_childspecs(X) -> {error, {badarg, X}}.
+%%%-----------------------------------------------------------------
+%%% Called by timer:apply_after from restart/2
+%-spec try_again_restart(SupRef, Child) -> ok when
+% SupRef :: sup_ref(),
+% Child :: child_id() | pid().
+%try_again_restart(Supervisor, Child) ->
+% cast(Supervisor, {try_again_restart, Child}).
+
find_child(Supervisor, Name) ->
[Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor),
Name1 =:= Name].
-%-export([behaviour_info/1]).
-
-%behaviour_info(callbacks) ->
-% [{init,1}];
-%behaviour_info(_Other) ->
-% undefined.
-
%%% ---------------------------------------------------
-%%%
+%%%
%%% Initialize the supervisor.
-%%%
+%%%
%%% ---------------------------------------------------
-type init_sup_name() :: sup_name() | 'self'.
@@ -321,12 +328,12 @@ init_dynamic(_State, StartSpec) ->
%%-----------------------------------------------------------------
%% Func: start_children/2
-%% Args: Children = [#child] in start order
-%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod}
-%% Purpose: Start all children. The new list contains #child's
+%% Args: Children = [child_rec()] in start order
+%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod}
+%% Purpose: Start all children. The new list contains #child's
%% with pids.
%% Returns: {ok, NChildren} | {error, NChildren}
-%% NChildren = [#child] in termination order (reversed
+%% NChildren = [child_rec()] in termination order (reversed
%% start order)
%%-----------------------------------------------------------------
start_children(Children, SupName) -> start_children(Children, [], SupName).
@@ -375,36 +382,47 @@ do_start_child_i(M, F, A) ->
{error, What}
end.
-
%%% ---------------------------------------------------
-%%%
+%%%
%%% Callback functions.
-%%%
+%%%
%%% ---------------------------------------------------
--type call() :: 'which_children'.
+-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine
-spec handle_call(call(), term(), state()) -> {'reply', term(), state()}.
handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) ->
- #child{mfa = {M, F, A}} = hd(State#state.children),
+ Child = hd(State#state.children),
+ #child{mfa = {M, F, A}} = Child,
Args = A ++ EArgs,
case do_start_child_i(M, F, Args) of
{ok, undefined} ->
{reply, {ok, undefined}, State};
{ok, Pid} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid}, NState};
{ok, Pid, Extra} ->
- NState = State#state{dynamics =
- ?DICT:store(Pid, Args, State#state.dynamics)},
+ NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)},
{reply, {ok, Pid, Extra}, NState};
What ->
{reply, What, State}
end;
-%%% The requests terminate_child, delete_child and restart_child are
-%%% invalid for simple_one_for_one and simple_one_for_one_terminate
-%%% supervisors.
+%% terminate_child for simple_one_for_one can only be done with pid
+handle_call({terminate_child, Name}, _From, State) when not is_pid(Name),
+ ?is_simple(State) ->
+ {reply, {error, simple_one_for_one}, State};
+
+handle_call({terminate_child, Name}, _From, State) ->
+ case get_child(Name, State) of
+ {value, Child} ->
+ NChild = do_terminate(Child, State#state.name),
+ {reply, ok, replace_child(NChild, State)};
+ _ ->
+ {reply, {error, not_found}, State}
+ end;
+
+%%% The requests delete_child and restart_child are invalid for
+%%% simple_one_for_one and simple_one_for_one_terminate supervisors.
handle_call({_Req, _Data}, _From, State) when ?is_simple(State) ->
{reply, {error, State#state.strategy}, State};
@@ -447,15 +465,6 @@ handle_call({delete_child, Name}, _From, State) ->
{reply, {error, not_found}, State}
end;
-handle_call({terminate_child, Name}, _From, State) ->
- case get_child(Name, State) of
- {value, Child} ->
- NChild = do_terminate(Child, State#state.name),
- {reply, ok, replace_child(NChild, State)};
- _ ->
- {reply, {error, not_found}, State}
- end;
-
handle_call(which_children, _From, State) when ?is_simple(State) ->
[#child{child_type = CT, modules = Mods}] = State#state.children,
Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end,
@@ -469,7 +478,58 @@ handle_call(which_children, _From, State) ->
{Name, Pid, ChildType, Mods}
end,
State#state.children),
- {reply, Resp, State}.
+ {reply, Resp, State};
+
+handle_call(count_children, _From, #state{children = [#child{restart_type = temporary,
+ child_type = CT}]} = State)
+ when ?is_simple(State) ->
+ {Active, Count} =
+ ?SETS:fold(fun(Pid, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->{Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(temporary, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
+
+handle_call(count_children, _From, #state{children = [#child{restart_type = RType,
+ child_type = CT}]} = State)
+ when ?is_simple(State) ->
+ {Active, Count} =
+ ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true ->
+ {Alive+1, Tot +1};
+ false ->
+ {Alive, Tot + 1}
+ end
+ end, {0, 0}, dynamics_db(RType, State#state.dynamics)),
+ Reply = case CT of
+ supervisor -> [{specs, 1}, {active, Active},
+ {supervisors, Count}, {workers, 0}];
+ worker -> [{specs, 1}, {active, Active},
+ {supervisors, 0}, {workers, Count}]
+ end,
+ {reply, Reply, State};
+
+handle_call(count_children, _From, State) ->
+ %% Specs and children are together on the children list...
+ {Specs, Active, Supers, Workers} =
+ lists:foldl(fun(Child, Counts) ->
+ count_child(Child, Counts)
+ end, {0,0,0,0}, State#state.children),
+
+ %% Reformat counts to a property list.
+ Reply = [{specs, Specs}, {active, Active},
+ {supervisors, Supers}, {workers, Workers}],
+ {reply, Reply, State}.
-spec handle_cast('null', state()) -> {'noreply', state()}.
%%% Hopefully cause a function-clause as there is no API function
@@ -477,12 +537,35 @@ handle_call(which_children, _From, State) ->
handle_cast(null, State) ->
error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n",
[]),
-
{noreply, State}.
+count_child(#child{pid = Pid, child_type = worker},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers, Workers+1};
+ false -> {Specs+1, Active, Supers, Workers+1}
+ end;
+count_child(#child{pid = Pid, child_type = supervisor},
+ {Specs, Active, Supers, Workers}) ->
+ case is_pid(Pid) andalso is_process_alive(Pid) of
+ true -> {Specs+1, Active+1, Supers+1, Workers};
+ false -> {Specs+1, Active, Supers+1, Workers}
+ end.
+
+%%
+%% Take care of terminated children.
+%%
-spec handle_info(term(), state()) ->
{'noreply', state()} | {'stop', 'shutdown', state()}.
+handle_info({'EXIT', Pid, Reason}, State) ->
+ case restart_child(Pid, Reason, State) of
+ {ok, State1} ->
+ {noreply, State1};
+ {shutdown, State1} ->
+ {stop, shutdown, State1}
+ end;
+
handle_info({delayed_restart, {RestartType, Reason, Child}}, State)
when ?is_simple(State) ->
{ok, NState} = do_restart(RestartType, Reason, Child, State),
@@ -496,21 +579,11 @@ handle_info({delayed_restart, {RestartType, Reason, Child}}, State) ->
{noreply, State}
end;
-%%
-%% Take care of terminated children.
-%%
-handle_info({'EXIT', Pid, Reason}, State) ->
- case restart_child(Pid, Reason, State) of
- {ok, State1} ->
- {noreply, State1};
- {shutdown, State1} ->
- {stop, shutdown, State1}
- end;
-
handle_info(Msg, State) ->
error_logger:error_msg("Supervisor received unexpected message: ~p~n",
[Msg]),
{noreply, State}.
+
%%
%% Terminate this server.
%%
@@ -563,14 +636,13 @@ check_flags({Strategy, MaxIntensity, Period}) ->
check_flags(What) ->
{bad_flags, What}.
-update_childspec(State, StartSpec) when ?is_simple(State) ->
+update_childspec(State, StartSpec) when ?is_simple(State) ->
case check_startspec(StartSpec) of
{ok, [Child]} ->
{ok, State#state{children = [Child]}};
Error ->
{error, Error}
end;
-
update_childspec(State, StartSpec) ->
case check_startspec(StartSpec) of
{ok, Children} ->
@@ -589,7 +661,7 @@ update_childspec1([Child|OldC], Children, KeepOld) ->
update_childspec1(OldC, Children, [Child|KeepOld])
end;
update_childspec1([], Children, KeepOld) ->
- % Return them in (keeped) reverse start order.
+ %% Return them in (kept) reverse start order.
lists:reverse(Children ++ KeepOld).
update_chsp(OldCh, Children) ->
@@ -615,14 +687,10 @@ handle_start_child(Child, State) ->
case do_start_child(State#state.name, Child) of
{ok, Pid} ->
Children = State#state.children,
- {{ok, Pid},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid}, State#state{children = [Child#child{pid = Pid}|Children]}};
{ok, Pid, Extra} ->
Children = State#state.children,
- {{ok, Pid, Extra},
- State#state{children =
- [Child#child{pid = Pid}|Children]}};
+ {{ok, Pid, Extra}, State#state{children = [Child#child{pid = Pid}|Children]}};
{error, What} ->
{{error, {What, Child}}, State}
end;
@@ -634,7 +702,7 @@ handle_start_child(Child, State) ->
%%% ---------------------------------------------------
%%% Restart. A process has terminated.
-%%% Returns: {ok, #state} | {shutdown, #state}
+%%% Returns: {ok, state()} | {shutdown, state()}
%%% ---------------------------------------------------
restart_child(Pid, Reason, State) when ?is_simple(State) ->
@@ -1002,12 +1070,12 @@ remove_child(Child, State) ->
%% Type = {Strategy, MaxIntensity, Period}
%% Strategy = one_for_one | one_for_all | simple_one_for_one |
%% rest_for_one
-%% MaxIntensity = integer()
-%% Period = integer()
+%% MaxIntensity = integer() >= 0
+%% Period = integer() > 0
%% Mod :== atom()
-%% Arsg :== term()
+%% Args :== term()
%% Purpose: Check that Type is of correct type (!)
-%% Returns: {ok, #state} | Error
+%% Returns: {ok, state()} | Error
%%-----------------------------------------------------------------
init_state(SupName, Type, Mod, Args) ->
case catch init_state1(SupName, Type, Mod, Args) of
@@ -1053,15 +1121,15 @@ supname(N, _) -> N.
%%% Shall be a six (6) tuple
%%% {Name, Func, RestartType, Shutdown, ChildType, Modules}
%%% where Name is an atom
-%%% Func is {Mod, Fun, Args} == {atom, atom, list}
+%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()}
%%% RestartType is permanent | temporary | transient |
%%% intrinsic | {permanent, Delay} |
%%% {transient, Delay} | {intrinsic, Delay}
%% where Delay >= 0
-%%% Shutdown = integer() | infinity | brutal_kill
+%%% Shutdown = integer() > 0 | infinity | brutal_kill
%%% ChildType = supervisor | worker
%%% Modules = [atom()] | dynamic
-%%% Returns: {ok, [#child]} | Error
+%%% Returns: {ok, [child_rec()]} | Error
%%% ------------------------------------------------------
check_startspec(Children) -> check_startspec(Children, []).
@@ -1117,7 +1185,7 @@ validDelay(Delay) when is_number(Delay),
Delay >= 0 -> true;
validDelay(What) -> throw({invalid_delay, What}).
-validShutdown(Shutdown, _)
+validShutdown(Shutdown, _)
when is_integer(Shutdown), Shutdown > 0 -> true;
validShutdown(infinity, supervisor) -> true;
validShutdown(brutal_kill, _) -> true;