diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/worker_pool.erl | 39 | ||||
| -rw-r--r-- | src/worker_pool_sup.erl | 23 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 22 |
3 files changed, 51 insertions, 33 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 71359aa5ed..6a2070dd9a 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -49,8 +49,12 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit/2, submit_async/1, ready/1, - idle/1]). +-export([start_link/1, + submit/1, submit/2, submit/3, + submit_async/1, submit_async/2, + ready/2, + idle/2, + default_pool/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -61,18 +65,20 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (atom()) -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). -spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). +-spec(submit/3 :: (atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). --spec(ready/1 :: (pid()) -> 'ok'). --spec(idle/1 :: (pid()) -> 'ok'). +-spec(ready/2 :: (atom(), pid()) -> 'ok'). +-spec(idle/2 :: (atom(), pid()) -> 'ok'). +-spec(default_pool/0 :: () -> atom()). -endif. %%---------------------------------------------------------------------------- --define(SERVER, ?MODULE). +-define(DEFAULT_POOL, ?MODULE). -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). @@ -80,25 +86,32 @@ %%---------------------------------------------------------------------------- -start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], - [{timeout, infinity}]). +start_link(Name) -> gen_server2:start_link({local, Name}, ?MODULE, [], + [{timeout, infinity}]). submit(Fun) -> - submit(Fun, reuse). + submit(?DEFAULT_POOL, Fun, reuse). %% ProcessModel =:= single is for working around the mnesia_locker bug. submit(Fun, ProcessModel) -> + submit(?DEFAULT_POOL, Fun, ProcessModel). + +submit(Server, Fun, ProcessModel) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), + _ -> Pid = gen_server2:call(Server, {next_free, self()}, infinity), worker_pool_worker:submit(Pid, Fun, ProcessModel) end. -submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). +submit_async(Fun) -> submit_async(?DEFAULT_POOL, Fun). + +submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}). + +ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}). -ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}). +idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}). -idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}). +default_pool() -> ?DEFAULT_POOL. %%---------------------------------------------------------------------------- diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl index 89d2ed46da..c429a4db6c 100644 --- a/src/worker_pool_sup.erl +++ b/src/worker_pool_sup.erl @@ -18,7 +18,7 @@ -behaviour(supervisor). --export([start_link/0, start_link/1]). +-export([start_link/0, start_link/1, start_link/2]). -export([init/1]). @@ -28,26 +28,29 @@ -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). -spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (non_neg_integer(), atom()) + -> rabbit_types:ok_pid_or_error()). -endif. %%---------------------------------------------------------------------------- --define(SERVER, ?MODULE). - -%%---------------------------------------------------------------------------- - start_link() -> start_link(erlang:system_info(schedulers)). start_link(WCount) -> - supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]). + start_link(WCount, worker_pool:default_pool()). + +start_link(WCount, PoolName) -> + SupName = list_to_atom(atom_to_list(PoolName) ++ "_sup"), + supervisor:start_link({local, SupName}, ?MODULE, [WCount, PoolName]). %%---------------------------------------------------------------------------- -init([WCount]) -> +init([WCount, PoolName]) -> {ok, {{one_for_one, 10, 10}, - [{worker_pool, {worker_pool, start_link, []}, transient, + [{worker_pool, {worker_pool, start_link, [PoolName]}, transient, 16#ffffffff, worker, [worker_pool]} | - [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff, - worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}. + [{N, {worker_pool_worker, start_link, [PoolName]}, transient, + 16#ffffffff, worker, [worker_pool_worker]} + || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index c2d058923d..ad2aee7b40 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -23,7 +23,8 @@ -behaviour(gen_server2). --export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]). +-export([start_link/1, next_job_from/2, submit/3, submit_async/2, + run/1]). -export([set_maximum_since_use/2]). @@ -36,7 +37,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: (atom) -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). @@ -52,8 +53,8 @@ %%---------------------------------------------------------------------------- -start_link() -> - gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). +start_link(PoolName) -> + gen_server2:start_link(?MODULE, [PoolName], [{timeout, infinity}]). next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). @@ -86,11 +87,12 @@ run(Fun, single) -> %%---------------------------------------------------------------------------- -init([]) -> +init([PoolName]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:ready(self()), + ok = worker_pool:ready(PoolName, self()), put(worker_pool_worker, true), + put(worker_pool_name, PoolName), {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -104,7 +106,7 @@ handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) -> handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), gen_server2:reply(From, run(Fun, ProcessModel)), - ok = worker_pool:idle(self()), + ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; handle_call(Msg, _From, State) -> @@ -116,12 +118,12 @@ handle_cast({next_job_from, CPid}, undefined) -> handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) -> gen_server2:reply(From, run(Fun, ProcessModel)), - ok = worker_pool:idle(self()), + ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; handle_cast({submit_async, Fun}, undefined) -> run(Fun), - ok = worker_pool:idle(self()), + ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> @@ -132,7 +134,7 @@ handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> - ok = worker_pool:idle(self()), + ok = worker_pool:idle(get(worker_pool_name), self()), {noreply, undefined, hibernate}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> |
