summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorash-lshift <ash@lshift.net>2015-03-24 17:45:04 +0000
committerash-lshift <ash@lshift.net>2015-03-24 17:45:04 +0000
commitf6dcdf11dccff7e249a4ff5b502c3ee42a96e8b3 (patch)
tree009b24e74538a46645b3bd070d4c3e829436a0cd /src
parentd538f7d95646a7a9527c9f05ee274768a758a266 (diff)
downloadrabbitmq-server-git-f6dcdf11dccff7e249a4ff5b502c3ee42a96e8b3.tar.gz
allow multiple instances of worker_pool
Diffstat (limited to 'src')
-rw-r--r--src/worker_pool.erl35
-rw-r--r--src/worker_pool_sup.erl23
-rw-r--r--src/worker_pool_worker.erl22
3 files changed, 47 insertions, 33 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 71359aa5ed..29cc18dfed 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -49,8 +49,11 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit/2, submit_async/1, ready/1,
- idle/1]).
+-export([start_link/1,
+ submit/1, submit/2, submit/3,
+ submit_async/1, submit_async/2,
+ ready/2,
+ idle/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -61,18 +64,19 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (atom) -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
+-spec(submit/3 :: (atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
--spec(ready/1 :: (pid()) -> 'ok').
--spec(idle/1 :: (pid()) -> 'ok').
+-spec(ready/2 :: (atom(), pid()) -> 'ok').
+-spec(idle/2 :: (atom(), pid()) -> 'ok').
-endif.
%%----------------------------------------------------------------------------
--define(SERVER, ?MODULE).
+-define(DEFAULT_SERVER, ?MODULE).
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
@@ -80,25 +84,30 @@
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
- [{timeout, infinity}]).
+start_link(Name) -> gen_server2:start_link({local, Name}, ?MODULE, [],
+ [{timeout, infinity}]).
submit(Fun) ->
- submit(Fun, reuse).
+ submit(?DEFAULT_SERVER, Fun, reuse).
%% ProcessModel =:= single is for working around the mnesia_locker bug.
submit(Fun, ProcessModel) ->
+ submit(?DEFAULT_SERVER, Fun, ProcessModel).
+
+submit(Server, Fun, ProcessModel) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
+ _ -> Pid = gen_server2:call(Server, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun, ProcessModel)
end.
-submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
+submit_async(Fun) -> submit_async(?DEFAULT_SERVER, Fun).
+
+submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}).
-ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}).
-idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
+idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}).
%%----------------------------------------------------------------------------
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
index 89d2ed46da..dc05a67b0b 100644
--- a/src/worker_pool_sup.erl
+++ b/src/worker_pool_sup.erl
@@ -18,7 +18,7 @@
-behaviour(supervisor).
--export([start_link/0, start_link/1]).
+-export([start_link/0, start_link/1, start_link/2]).
-export([init/1]).
@@ -28,26 +28,29 @@
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_link/1 :: (non_neg_integer()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (non_neg_integer(), atom())
+ -> rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
--define(SERVER, ?MODULE).
-
-%%----------------------------------------------------------------------------
-
start_link() ->
start_link(erlang:system_info(schedulers)).
start_link(WCount) ->
- supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]).
+ start_link(WCount, worker_pool).
+
+start_link(WCount, PoolName) ->
+ SupName = list_to_atom(atom_to_list(PoolName) ++ "_sup"),
+ supervisor:start_link({local, SupName}, ?MODULE, [WCount, PoolName]).
%%----------------------------------------------------------------------------
-init([WCount]) ->
+init([WCount, PoolName]) ->
{ok, {{one_for_one, 10, 10},
- [{worker_pool, {worker_pool, start_link, []}, transient,
+ [{worker_pool, {worker_pool, start_link, [PoolName]}, transient,
16#ffffffff, worker, [worker_pool]} |
- [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,
- worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.
+ [{N, {worker_pool_worker, start_link, [PoolName]}, transient,
+ 16#ffffffff, worker, [worker_pool_worker]}
+ || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index c2d058923d..ad2aee7b40 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -23,7 +23,8 @@
-behaviour(gen_server2).
--export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).
+-export([start_link/1, next_job_from/2, submit/3, submit_async/2,
+ run/1]).
-export([set_maximum_since_use/2]).
@@ -36,7 +37,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/1 :: (atom) -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
@@ -52,8 +53,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+start_link(PoolName) ->
+ gen_server2:start_link(?MODULE, [PoolName], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
@@ -86,11 +87,12 @@ run(Fun, single) ->
%%----------------------------------------------------------------------------
-init([]) ->
+init([PoolName]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:ready(self()),
+ ok = worker_pool:ready(PoolName, self()),
put(worker_pool_worker, true),
+ put(worker_pool_name, PoolName),
{ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -104,7 +106,7 @@ handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun, ProcessModel)),
- ok = worker_pool:idle(self()),
+ ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
@@ -116,12 +118,12 @@ handle_cast({next_job_from, CPid}, undefined) ->
handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
gen_server2:reply(From, run(Fun, ProcessModel)),
- ok = worker_pool:idle(self()),
+ ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
- ok = worker_pool:idle(self()),
+ ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -132,7 +134,7 @@ handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
- ok = worker_pool:idle(self()),
+ ok = worker_pool:idle(get(worker_pool_name), self()),
{noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->