diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-02 02:39:15 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-02 02:39:15 +0100 |
| commit | 755f382800d70c33e1f96750d00b98ae9bae3526 (patch) | |
| tree | 01479ee160b3c79ad788178ebd6464302ff38668 /src | |
| parent | 24b5e23adafda4ce541d4a7372b076e3a289f042 (diff) | |
| download | rabbitmq-server-git-755f382800d70c33e1f96750d00b98ae9bae3526.tar.gz | |
Support async job submission
Diffstat (limited to 'src')
| -rw-r--r-- | src/worker_pool.erl | 27 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 11 |
2 files changed, 32 insertions, 6 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl index b883d4f0cb..fcacdd5914 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -40,12 +40,10 @@ %% %% 1. Allow priorities (basically, change the pending queue to a %% priority_queue). -%% -%% 2. Allow the submission to the pool_worker to be async. -behaviour(gen_server2). --export([start_link/0, submit/1, idle/1]). +-export([start_link/0, submit/1, submit_async/1, idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,6 +54,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -80,6 +79,9 @@ submit(Fun) -> worker_pool_worker:submit(Pid, Fun) end. +submit_async(Fun) -> + gen_server2:cast(?SERVER, {run_async, Fun}). + idle(WId) -> gen_server2:cast(?SERVER, {idle, WId}). @@ -93,7 +95,8 @@ handle_call(next_free, From, State = #state { available = Avail, pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> - {noreply, State #state { pending = queue:in(From, Pending) }}; + {noreply, + State #state { pending = queue:in({next_free, From}, Pending) }}; {{value, WId}, Avail1} -> {reply, get_worker_pid(WId), State #state { available = Avail1 }} end; @@ -106,11 +109,25 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, From}, Pending1} -> + {{value, {next_free, From}}, Pending1} -> gen_server2:reply(From, get_worker_pid(WId)), + State #state { pending = Pending1 }; + {{value, {run_async, Fun}}, Pending1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), State #state { pending = Pending1 } end}; +handle_cast({run_async, Fun}, State = #state { available = Avail, + pending = Pending }) -> + {noreply, + case queue:out(Avail) of + {empty, _Avail} -> + State #state { pending = queue:in({run_async, Fun}, Pending)}; + {{value, WId}, Avail1} -> + worker_pool_worker:submit_async(get_worker_pid(WId), Fun), + State #state { available = Avail1 } + end}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index fc3ce3714f..4defc5bafe 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, run/1]). +-export([start_link/1, submit/2, submit_async/2, run/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,6 +44,7 @@ -spec(start_link/1 :: (any()) -> {'ok', pid()} | 'ignore' | {'error', any()}). -spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). -endif. @@ -60,6 +61,9 @@ start_link(WId) -> submit(Pid, Fun) -> gen_server2:call(Pid, {submit, Fun}, infinity). +submit_async(Pid, Fun) -> + gen_server2:cast(Pid, {submit_async, Fun}). + init([WId]) -> ok = worker_pool:idle(WId), put(worker_pool_worker, true), @@ -74,6 +78,11 @@ handle_call({submit, Fun}, From, WId) -> handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. +handle_cast({submit_async, Fun}, WId) -> + run(Fun), + ok = worker_pool:idle(WId), + {noreply, WId}; + handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. |
