diff options
| author | Tim Watson <watson.timothy@gmail.com> | 2012-10-26 14:23:33 +0100 |
|---|---|---|
| committer | Tim Watson <watson.timothy@gmail.com> | 2012-10-26 14:23:33 +0100 |
| commit | 89713129dc241d41ab80bf34ce55133ed424cbc7 (patch) | |
| tree | f8fedf149674715b86e479fee14354658ebb4fe0 | |
| parent | 4f074a9e5d57d5f3af696983665a812aecc8aaca (diff) | |
| download | rabbitmq-server-git-89713129dc241d41ab80bf34ce55133ed424cbc7.tar.gz | |
Work in progress. Lots of (reverse) costmetic changes, pull over
count_children/1 and move things around a bit. The OTP style of dynamic
child handling hasn't been merged yet, so we're not even compiling yet.
| -rw-r--r-- | src/supervisor2.erl | 294 |
1 files changed, 181 insertions, 113 deletions
diff --git a/src/supervisor2.erl b/src/supervisor2.erl index 2252c4b59b..eae2f29886 100644 --- a/src/supervisor2.erl +++ b/src/supervisor2.erl @@ -55,7 +55,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2009. All Rights Reserved. +%% Copyright Ericsson AB 1996-2012. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -75,15 +75,15 @@ -behaviour(gen_server). %% External exports --export([start_link/2,start_link/3, +-export([start_link/2, start_link/3, start_child/2, restart_child/2, delete_child/2, terminate_child/2, - which_children/1, find_child/2, - check_childspecs/1]). + which_children/1, count_children/1, + find_child/2, check_childspecs/1]). %% Internal exports --export([init/1, handle_call/3, handle_info/2, terminate/2, code_change/3]). --export([handle_cast/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). %%-------------------------------------------------------------------------- @@ -96,9 +96,7 @@ -type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | undefined}. -type modules() :: [module()] | 'dynamic'. -type delay() :: non_neg_integer(). --type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' - | {'permanent', delay()} | {'transient', delay()} - | {'intrinsic', delay()}. +-type restart() :: 'permanent' | 'transient' | 'temporary' | 'intrinsic' | {'permanent', delay()} | {'transient', delay()} | {'intrinsic', delay()}. -type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. @@ -114,69 +112,69 @@ Modules :: modules()}. -type strategy() :: 'one_for_all' | 'one_for_one' - | 'rest_for_one' | 'simple_one_for_one' - | 'simple_one_for_one_terminate'. + | 'rest_for_one' | 'simple_one_for_one' | 'simple_one_for_one_terminate'. %%-------------------------------------------------------------------------- -record(child, {% pid is undefined when child is not running - pid = undefined, - name, - mfa, - restart_type, - shutdown, - child_type, - modules = []}). --type child_rec() :: #child{pid :: child() | {restarting,pid()} | [pid()], - name :: child_id(), - mfa :: mfargs(), - restart_type :: restart(), - shutdown :: shutdown(), - child_type :: worker(), - modules :: modules()}. + pid = undefined :: child() | {restarting,pid()} | [pid()], + name :: child_id(), + mfa :: mfargs(), + restart_type :: restart(), + shutdown :: shutdown(), + child_type :: worker(), + modules = [] :: modules()}). +-type child_rec() :: #child{}. -define(DICT, dict). +-define(SETS, sets). +-define(SET, set). -record(state, {name, - strategy, - children = [], - dynamics = ?DICT:new(), - intensity, - period, + strategy :: strategy(), + children = [] :: [child_rec()], + dynamics = ?DICT:new() :: ?DICT(), + intensity :: non_neg_integer(), + period :: pos_integer(), restarts = [], module, args}). --type state() :: #state{strategy :: strategy(), - children :: [child_rec()], - dynamics :: ?DICT(), - intensity :: non_neg_integer(), - period :: pos_integer()}. +-type state() :: #state{}. --define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse - State#state.strategy =:= simple_one_for_one_terminate). - --define(is_terminate_simple(State), - State#state.strategy =:= simple_one_for_one_terminate). +-define(is_simple(State), State#state.strategy =:= simple_one_for_one orelse State#state.strategy =:= simple_one_for_one_terminate). +-define(is_terminate_simple(State), State#state.strategy =:= simple_one_for_one_terminate). -callback init(Args :: term()) -> {ok, {{RestartStrategy :: strategy(), - MaxR :: non_neg_integer(), - MaxT :: non_neg_integer()}, + MaxR :: non_neg_integer(), + MaxT :: non_neg_integer()}, [ChildSpec :: child_spec()]}} | ignore. +-define(restarting(_Pid_), {restarting,_Pid_}). + %%% --------------------------------------------------- %%% This is a general process supervisor built upon gen_server.erl. %%% Servers/processes should/could also be built using gen_server.erl. %%% SupName = {local, atom()} | {global, atom()}. %%% --------------------------------------------------- -start_link(Mod, Args) -> - gen_server:start_link(?MODULE, {self, Mod, Args}, []). +-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). +-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. +-spec start_link(Module, Args) -> startlink_ret() when + Module :: module(), + Args :: term(). +start_link(Mod, Args) -> + gen_server:start_link(supervisor, {self, Mod, Args}, []). + +-spec start_link(SupName, Module, Args) -> startlink_ret() when + SupName :: sup_name(), + Module :: module(), + Args :: term(). start_link(SupName, Mod, Args) -> gen_server:start_link(SupName, ?MODULE, {SupName, Mod, Args}, []). - + %%% --------------------------------------------------- %%% Interface functions. %%% --------------------------------------------------- @@ -220,7 +218,6 @@ delete_child(Supervisor, Name) -> -spec terminate_child(SupRef, Id) -> Result when SupRef :: sup_ref(), - Id :: pid() | child_id(), Result :: 'ok' | {'error', Error}, Error :: 'not_found' | 'simple_one_for_one'. @@ -229,20 +226,29 @@ terminate_child(Supervisor, Name) -> -spec which_children(SupRef) -> [{Id,Child,Type,Modules}] when SupRef :: sup_ref(), - Id :: child_id() | 'undefined', + Id :: child_id() | undefined, Child :: child(), Type :: worker(), Modules :: modules(). which_children(Supervisor) -> call(Supervisor, which_children). +-spec count_children(SupRef) -> PropListOfCounts when + SupRef :: sup_ref(), + PropListOfCounts :: [Count], + Count :: {specs, ChildSpecCount :: non_neg_integer()} + | {active, ActiveProcessCount :: non_neg_integer()} + | {supervisors, ChildSupervisorCount :: non_neg_integer()} + |{workers, ChildWorkerCount :: non_neg_integer()}. +count_children(Supervisor) -> + call(Supervisor, count_children). + call(Supervisor, Req) -> gen_server:call(Supervisor, Req, infinity). -spec check_childspecs(ChildSpecs) -> Result when ChildSpecs :: [child_spec()], Result :: 'ok' | {'error', Error :: term()}. - check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> case check_startspec(ChildSpecs) of {ok, _} -> ok; @@ -250,21 +256,22 @@ check_childspecs(ChildSpecs) when is_list(ChildSpecs) -> end; check_childspecs(X) -> {error, {badarg, X}}. +%%%----------------------------------------------------------------- +%%% Called by timer:apply_after from restart/2 +%-spec try_again_restart(SupRef, Child) -> ok when +% SupRef :: sup_ref(), +% Child :: child_id() | pid(). +%try_again_restart(Supervisor, Child) -> +% cast(Supervisor, {try_again_restart, Child}). + find_child(Supervisor, Name) -> [Pid || {Name1, Pid, _Type, _Modules} <- which_children(Supervisor), Name1 =:= Name]. -%-export([behaviour_info/1]). - -%behaviour_info(callbacks) -> -% [{init,1}]; -%behaviour_info(_Other) -> -% undefined. - %%% --------------------------------------------------- -%%% +%%% %%% Initialize the supervisor. -%%% +%%% %%% --------------------------------------------------- -type init_sup_name() :: sup_name() | 'self'. @@ -321,12 +328,12 @@ init_dynamic(_State, StartSpec) -> %%----------------------------------------------------------------- %% Func: start_children/2 -%% Args: Children = [#child] in start order -%% SupName = {local, atom()} | {global, atom()} | {pid(),Mod} -%% Purpose: Start all children. The new list contains #child's +%% Args: Children = [child_rec()] in start order +%% SupName = {local, atom()} | {global, atom()} | {pid(), Mod} +%% Purpose: Start all children. The new list contains #child's %% with pids. %% Returns: {ok, NChildren} | {error, NChildren} -%% NChildren = [#child] in termination order (reversed +%% NChildren = [child_rec()] in termination order (reversed %% start order) %%----------------------------------------------------------------- start_children(Children, SupName) -> start_children(Children, [], SupName). @@ -375,36 +382,47 @@ do_start_child_i(M, F, A) -> {error, What} end. - %%% --------------------------------------------------- -%%% +%%% %%% Callback functions. -%%% +%%% %%% --------------------------------------------------- --type call() :: 'which_children'. +-type call() :: 'which_children' | 'count_children' | {_, _}. % XXX: refine -spec handle_call(call(), term(), state()) -> {'reply', term(), state()}. handle_call({start_child, EArgs}, _From, State) when ?is_simple(State) -> - #child{mfa = {M, F, A}} = hd(State#state.children), + Child = hd(State#state.children), + #child{mfa = {M, F, A}} = Child, Args = A ++ EArgs, case do_start_child_i(M, F, Args) of {ok, undefined} -> {reply, {ok, undefined}, State}; {ok, Pid} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, {reply, {ok, Pid}, NState}; {ok, Pid, Extra} -> - NState = State#state{dynamics = - ?DICT:store(Pid, Args, State#state.dynamics)}, + NState = State#state{dynamics = ?DICT:store(Pid, Args, State#state.dynamics)}, {reply, {ok, Pid, Extra}, NState}; What -> {reply, What, State} end; -%%% The requests terminate_child, delete_child and restart_child are -%%% invalid for simple_one_for_one and simple_one_for_one_terminate -%%% supervisors. +%% terminate_child for simple_one_for_one can only be done with pid +handle_call({terminate_child, Name}, _From, State) when not is_pid(Name), + ?is_simple(State) -> + {reply, {error, simple_one_for_one}, State}; + +handle_call({terminate_child, Name}, _From, State) -> + case get_child(Name, State) of + {value, Child} -> + NChild = do_terminate(Child, State#state.name), + {reply, ok, replace_child(NChild, State)}; + _ -> + {reply, {error, not_found}, State} + end; + +%%% The requests delete_child and restart_child are invalid for +%%% simple_one_for_one and simple_one_for_one_terminate supervisors. handle_call({_Req, _Data}, _From, State) when ?is_simple(State) -> {reply, {error, State#state.strategy}, State}; @@ -447,15 +465,6 @@ handle_call({delete_child, Name}, _From, State) -> {reply, {error, not_found}, State} end; -handle_call({terminate_child, Name}, _From, State) -> - case get_child(Name, State) of - {value, Child} -> - NChild = do_terminate(Child, State#state.name), - {reply, ok, replace_child(NChild, State)}; - _ -> - {reply, {error, not_found}, State} - end; - handle_call(which_children, _From, State) when ?is_simple(State) -> [#child{child_type = CT, modules = Mods}] = State#state.children, Reply = lists:map(fun ({Pid, _}) -> {undefined, Pid, CT, Mods} end, @@ -469,7 +478,58 @@ handle_call(which_children, _From, State) -> {Name, Pid, ChildType, Mods} end, State#state.children), - {reply, Resp, State}. + {reply, Resp, State}; + +handle_call(count_children, _From, #state{children = [#child{restart_type = temporary, + child_type = CT}]} = State) + when ?is_simple(State) -> + {Active, Count} = + ?SETS:fold(fun(Pid, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true ->{Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(temporary, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; + +handle_call(count_children, _From, #state{children = [#child{restart_type = RType, + child_type = CT}]} = State) + when ?is_simple(State) -> + {Active, Count} = + ?DICT:fold(fun(Pid, _Val, {Alive, Tot}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> + {Alive+1, Tot +1}; + false -> + {Alive, Tot + 1} + end + end, {0, 0}, dynamics_db(RType, State#state.dynamics)), + Reply = case CT of + supervisor -> [{specs, 1}, {active, Active}, + {supervisors, Count}, {workers, 0}]; + worker -> [{specs, 1}, {active, Active}, + {supervisors, 0}, {workers, Count}] + end, + {reply, Reply, State}; + +handle_call(count_children, _From, State) -> + %% Specs and children are together on the children list... + {Specs, Active, Supers, Workers} = + lists:foldl(fun(Child, Counts) -> + count_child(Child, Counts) + end, {0,0,0,0}, State#state.children), + + %% Reformat counts to a property list. + Reply = [{specs, Specs}, {active, Active}, + {supervisors, Supers}, {workers, Workers}], + {reply, Reply, State}. -spec handle_cast('null', state()) -> {'noreply', state()}. %%% Hopefully cause a function-clause as there is no API function @@ -477,12 +537,35 @@ handle_call(which_children, _From, State) -> handle_cast(null, State) -> error_logger:error_msg("ERROR: Supervisor received cast-message 'null'~n", []), - {noreply, State}. +count_child(#child{pid = Pid, child_type = worker}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers, Workers+1}; + false -> {Specs+1, Active, Supers, Workers+1} + end; +count_child(#child{pid = Pid, child_type = supervisor}, + {Specs, Active, Supers, Workers}) -> + case is_pid(Pid) andalso is_process_alive(Pid) of + true -> {Specs+1, Active+1, Supers+1, Workers}; + false -> {Specs+1, Active, Supers+1, Workers} + end. + +%% +%% Take care of terminated children. +%% -spec handle_info(term(), state()) -> {'noreply', state()} | {'stop', 'shutdown', state()}. +handle_info({'EXIT', Pid, Reason}, State) -> + case restart_child(Pid, Reason, State) of + {ok, State1} -> + {noreply, State1}; + {shutdown, State1} -> + {stop, shutdown, State1} + end; + handle_info({delayed_restart, {RestartType, Reason, Child}}, State) when ?is_simple(State) -> {ok, NState} = do_restart(RestartType, Reason, Child, State), @@ -496,21 +579,11 @@ handle_info({delayed_restart, {RestartType, Reason, Child}}, State) -> {noreply, State} end; -%% -%% Take care of terminated children. -%% -handle_info({'EXIT', Pid, Reason}, State) -> - case restart_child(Pid, Reason, State) of - {ok, State1} -> - {noreply, State1}; - {shutdown, State1} -> - {stop, shutdown, State1} - end; - handle_info(Msg, State) -> error_logger:error_msg("Supervisor received unexpected message: ~p~n", [Msg]), {noreply, State}. + %% %% Terminate this server. %% @@ -563,14 +636,13 @@ check_flags({Strategy, MaxIntensity, Period}) -> check_flags(What) -> {bad_flags, What}. -update_childspec(State, StartSpec) when ?is_simple(State) -> +update_childspec(State, StartSpec) when ?is_simple(State) -> case check_startspec(StartSpec) of {ok, [Child]} -> {ok, State#state{children = [Child]}}; Error -> {error, Error} end; - update_childspec(State, StartSpec) -> case check_startspec(StartSpec) of {ok, Children} -> @@ -589,7 +661,7 @@ update_childspec1([Child|OldC], Children, KeepOld) -> update_childspec1(OldC, Children, [Child|KeepOld]) end; update_childspec1([], Children, KeepOld) -> - % Return them in (keeped) reverse start order. + %% Return them in (kept) reverse start order. lists:reverse(Children ++ KeepOld). update_chsp(OldCh, Children) -> @@ -615,14 +687,10 @@ handle_start_child(Child, State) -> case do_start_child(State#state.name, Child) of {ok, Pid} -> Children = State#state.children, - {{ok, Pid}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid}, State#state{children = [Child#child{pid = Pid}|Children]}}; {ok, Pid, Extra} -> Children = State#state.children, - {{ok, Pid, Extra}, - State#state{children = - [Child#child{pid = Pid}|Children]}}; + {{ok, Pid, Extra}, State#state{children = [Child#child{pid = Pid}|Children]}}; {error, What} -> {{error, {What, Child}}, State} end; @@ -634,7 +702,7 @@ handle_start_child(Child, State) -> %%% --------------------------------------------------- %%% Restart. A process has terminated. -%%% Returns: {ok, #state} | {shutdown, #state} +%%% Returns: {ok, state()} | {shutdown, state()} %%% --------------------------------------------------- restart_child(Pid, Reason, State) when ?is_simple(State) -> @@ -1002,12 +1070,12 @@ remove_child(Child, State) -> %% Type = {Strategy, MaxIntensity, Period} %% Strategy = one_for_one | one_for_all | simple_one_for_one | %% rest_for_one -%% MaxIntensity = integer() -%% Period = integer() +%% MaxIntensity = integer() >= 0 +%% Period = integer() > 0 %% Mod :== atom() -%% Arsg :== term() +%% Args :== term() %% Purpose: Check that Type is of correct type (!) -%% Returns: {ok, #state} | Error +%% Returns: {ok, state()} | Error %%----------------------------------------------------------------- init_state(SupName, Type, Mod, Args) -> case catch init_state1(SupName, Type, Mod, Args) of @@ -1053,15 +1121,15 @@ supname(N, _) -> N. %%% Shall be a six (6) tuple %%% {Name, Func, RestartType, Shutdown, ChildType, Modules} %%% where Name is an atom -%%% Func is {Mod, Fun, Args} == {atom, atom, list} +%%% Func is {Mod, Fun, Args} == {atom(), atom(), list()} %%% RestartType is permanent | temporary | transient | %%% intrinsic | {permanent, Delay} | %%% {transient, Delay} | {intrinsic, Delay} %% where Delay >= 0 -%%% Shutdown = integer() | infinity | brutal_kill +%%% Shutdown = integer() > 0 | infinity | brutal_kill %%% ChildType = supervisor | worker %%% Modules = [atom()] | dynamic -%%% Returns: {ok, [#child]} | Error +%%% Returns: {ok, [child_rec()]} | Error %%% ------------------------------------------------------ check_startspec(Children) -> check_startspec(Children, []). @@ -1117,7 +1185,7 @@ validDelay(Delay) when is_number(Delay), Delay >= 0 -> true; validDelay(What) -> throw({invalid_delay, What}). -validShutdown(Shutdown, _) +validShutdown(Shutdown, _) when is_integer(Shutdown), Shutdown > 0 -> true; validShutdown(infinity, supervisor) -> true; validShutdown(brutal_kill, _) -> true; |
