diff options
| author | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-06-02 16:37:05 +0200 |
|---|---|---|
| committer | Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> | 2017-06-26 13:30:46 +0200 |
| commit | b634a84851505535d9e8ecb7eb96584dfa371f58 (patch) | |
| tree | c979b0609accbfecdc6a265bcfd92773a4a48423 /src | |
| parent | 7d470b84f004cbcddefa960df0c65bbcb95d9a83 (diff) | |
| download | rabbitmq-server-git-b634a84851505535d9e8ecb7eb96584dfa371f58.tar.gz | |
worker_pool: Move to rabbitmq-common
Thi resolves a dependency of rabbitmq-common on rabbitmq-server.
[#118490793]
Diffstat (limited to 'src')
| -rw-r--r-- | src/worker_pool.erl | 172 | ||||
| -rw-r--r-- | src/worker_pool_sup.erl | 56 | ||||
| -rw-r--r-- | src/worker_pool_worker.erl | 193 |
3 files changed, 0 insertions, 421 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl deleted file mode 100644 index 71ed2a359a..0000000000 --- a/src/worker_pool.erl +++ /dev/null @@ -1,172 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(worker_pool). - -%% Generic worker pool manager. -%% -%% Submitted jobs are functions. They can be executed synchronously -%% (using worker_pool:submit/1, worker_pool:submit/2) or asynchronously -%% (using worker_pool:submit_async/1). -%% -%% We typically use the worker pool if we want to limit the maximum -%% parallelism of some job. We are not trying to dodge the cost of -%% creating Erlang processes. -%% -%% Supports nested submission of jobs and two execution modes: -%% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in -%% a one-off process. Those executed in 'reuse' mode are invoked in a -%% worker process out of the pool. Nested jobs are always executed -%% immediately in current worker process. -%% -%% 'single' mode is offered to work around a bug in Mnesia: after -%% network partitions reply messages for prior failed requests can be -%% sent to Mnesia clients - a reused worker pool process can crash on -%% receiving one. -%% -%% Caller submissions are enqueued internally. When the next worker -%% process is available, it communicates it to the pool and is -%% assigned a job to execute. If job execution fails with an error, no -%% response is returned to the caller. -%% -%% Worker processes prioritise certain command-and-control messages -%% from the pool. -%% -%% Future improvement points: job prioritisation. - --behaviour(gen_server2). - --export([start_link/1, - submit/1, submit/2, submit/3, - submit_async/1, submit_async/2, - ready/2, - idle/2, - default_pool/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - -%%---------------------------------------------------------------------------- - --type mfargs() :: {atom(), atom(), [any()]}. - --spec start_link(atom()) -> {'ok', pid()} | {'error', any()}. --spec submit(fun (() -> A) | mfargs()) -> A. --spec submit(fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. --spec submit(atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. --spec submit_async(fun (() -> any()) | mfargs()) -> 'ok'. --spec ready(atom(), pid()) -> 'ok'. --spec idle(atom(), pid()) -> 'ok'. --spec default_pool() -> atom(). - -%%---------------------------------------------------------------------------- - --define(DEFAULT_POOL, ?MODULE). --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - --record(state, { available, pending }). - -%%---------------------------------------------------------------------------- - -start_link(Name) -> gen_server2:start_link({local, Name}, ?MODULE, [], - [{timeout, infinity}]). - -submit(Fun) -> - submit(?DEFAULT_POOL, Fun, reuse). - -%% ProcessModel =:= single is for working around the mnesia_locker bug. -submit(Fun, ProcessModel) -> - submit(?DEFAULT_POOL, Fun, ProcessModel). - -submit(Server, Fun, ProcessModel) -> - case get(worker_pool_worker) of - true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(Server, {next_free, self()}, infinity), - worker_pool_worker:submit(Pid, Fun, ProcessModel) - end. - -submit_async(Fun) -> submit_async(?DEFAULT_POOL, Fun). - -submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}). - -ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}). - -idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}). - -default_pool() -> ?DEFAULT_POOL. - -%%---------------------------------------------------------------------------- - -init([]) -> - {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -handle_call({next_free, CPid}, From, State = #state { available = [], - pending = Pending }) -> - {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)}, - hibernate}; -handle_call({next_free, CPid}, _From, State = #state { available = - [WPid | Avail1] }) -> - worker_pool_worker:next_job_from(WPid, CPid), - {reply, WPid, State #state { available = Avail1 }, hibernate}; - -handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, State}. - -handle_cast({ready, WPid}, State) -> - erlang:monitor(process, WPid), - handle_cast({idle, WPid}, State); - -handle_cast({idle, WPid}, State = #state { available = Avail, - pending = Pending }) -> - {noreply, - case queue:out(Pending) of - {empty, _Pending} -> - State #state { available = ordsets:add_element(WPid, Avail) }; - {{value, {next_free, From, CPid}}, Pending1} -> - worker_pool_worker:next_job_from(WPid, CPid), - gen_server2:reply(From, WPid), - State #state { pending = Pending1 }; - {{value, {run_async, Fun}}, Pending1} -> - worker_pool_worker:submit_async(WPid, Fun), - State #state { pending = Pending1 } - end, hibernate}; - -handle_cast({run_async, Fun}, State = #state { available = [], - pending = Pending }) -> - {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)}, - hibernate}; -handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) -> - worker_pool_worker:submit_async(WPid, Fun), - {noreply, State #state { available = Avail1 }, hibernate}; - -handle_cast(Msg, State) -> - {stop, {unexpected_cast, Msg}, State}. - -handle_info({'DOWN', _MRef, process, WPid, _Reason}, - State = #state { available = Avail }) -> - {noreply, State #state { available = ordsets:del_element(WPid, Avail) }, - hibernate}; - -handle_info(Msg, State) -> - {stop, {unexpected_info, Msg}, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, State) -> - State. diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl deleted file mode 100644 index 2608f5c2e6..0000000000 --- a/src/worker_pool_sup.erl +++ /dev/null @@ -1,56 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(worker_pool_sup). - --behaviour(supervisor). - --export([start_link/0, start_link/1, start_link/2]). - --export([init/1]). - -%%---------------------------------------------------------------------------- - --spec start_link() -> rabbit_types:ok_pid_or_error(). --spec start_link(non_neg_integer()) -> rabbit_types:ok_pid_or_error(). --spec start_link(non_neg_integer(), atom()) - -> rabbit_types:ok_pid_or_error(). - -%%---------------------------------------------------------------------------- - -start_link() -> - start_link(erlang:system_info(schedulers)). - -start_link(WCount) -> - start_link(WCount, worker_pool:default_pool()). - -start_link(WCount, PoolName) -> - SupName = list_to_atom(atom_to_list(PoolName) ++ "_sup"), - supervisor:start_link({local, SupName}, ?MODULE, [WCount, PoolName]). - -%%---------------------------------------------------------------------------- - -init([WCount, PoolName]) -> - %% we want to survive up to 1K of worker restarts per second, - %% e.g. when a large worker pool used for network connections - %% encounters a network failure. This is the case in the LDAP authentication - %% backend plugin. - {ok, {{one_for_one, 1000, 1}, - [{worker_pool, {worker_pool, start_link, [PoolName]}, transient, - 16#ffffffff, worker, [worker_pool]} | - [{N, {worker_pool_worker, start_link, [PoolName]}, transient, - 16#ffffffff, worker, [worker_pool_worker]} - || N <- lists:seq(1, WCount)]]}}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl deleted file mode 100644 index 23db127def..0000000000 --- a/src/worker_pool_worker.erl +++ /dev/null @@ -1,193 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(worker_pool_worker). - -%% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1, -%% worker_pool:submit/2 or worker_pool:submit_async/1. -%% -%% See worker_pool for an overview. - --behaviour(gen_server2). - --export([start_link/1, next_job_from/2, submit/3, submit_async/2, - run/1]). - --export([set_maximum_since_use/2]). --export([set_timeout/2, set_timeout/3, clear_timeout/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/3]). - -%%---------------------------------------------------------------------------- - --type mfargs() :: {atom(), atom(), [any()]}. - --spec start_link(atom) -> {'ok', pid()} | {'error', any()}. --spec next_job_from(pid(), pid()) -> 'ok'. --spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A. --spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'. --spec run(fun (() -> A)) -> A; (mfargs()) -> any(). --spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'. - -%%---------------------------------------------------------------------------- - --define(HIBERNATE_AFTER_MIN, 1000). --define(DESIRED_HIBERNATE, 10000). - -%%---------------------------------------------------------------------------- - -start_link(PoolName) -> - gen_server2:start_link(?MODULE, [PoolName], [{timeout, infinity}]). - -next_job_from(Pid, CPid) -> - gen_server2:cast(Pid, {next_job_from, CPid}). - -submit(Pid, Fun, ProcessModel) -> - gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity). - -submit_async(Pid, Fun) -> - gen_server2:cast(Pid, {submit_async, Fun}). - -set_maximum_since_use(Pid, Age) -> - gen_server2:cast(Pid, {set_maximum_since_use, Age}). - -run({M, F, A}) -> apply(M, F, A); -run(Fun) -> Fun(). - -run(Fun, reuse) -> - run(Fun); -run(Fun, single) -> - Self = self(), - Ref = make_ref(), - spawn_link(fun () -> - put(worker_pool_worker, true), - Self ! {Ref, run(Fun)}, - unlink(Self) - end), - receive - {Ref, Res} -> Res - end. - -%%---------------------------------------------------------------------------- - -init([PoolName]) -> - ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, - [self()]), - ok = worker_pool:ready(PoolName, self()), - put(worker_pool_worker, true), - put(worker_pool_name, PoolName), - {ok, undefined, hibernate, - {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. - -prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; -prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; -prioritise_cast(_Msg, _Len, _State) -> 0. - -handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) -> - {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate}; - -handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) -> - erlang:demonitor(MRef), - gen_server2:reply(From, run(Fun, ProcessModel)), - ok = worker_pool:idle(get(worker_pool_name), self()), - {noreply, undefined, hibernate}; - -handle_call(Msg, _From, State) -> - {stop, {unexpected_call, Msg}, State}. - -handle_cast({next_job_from, CPid}, undefined) -> - MRef = erlang:monitor(process, CPid), - {noreply, {from, CPid, MRef}, hibernate}; - -handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) -> - gen_server2:reply(From, run(Fun, ProcessModel)), - ok = worker_pool:idle(get(worker_pool_name), self()), - {noreply, undefined, hibernate}; - -handle_cast({submit_async, Fun}, undefined) -> - run(Fun), - ok = worker_pool:idle(get(worker_pool_name), self()), - {noreply, undefined, hibernate}; - -handle_cast({set_maximum_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, State, hibernate}; - -handle_cast(Msg, State) -> - {stop, {unexpected_cast, Msg}, State}. - -handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> - ok = worker_pool:idle(get(worker_pool_name), self()), - {noreply, undefined, hibernate}; - -handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> - {noreply, State, hibernate}; - -handle_info({timeout, Key, Fun}, State) -> - clear_timeout(Key), - Fun(), - {noreply, State, hibernate}; - -handle_info(Msg, State) -> - {stop, {unexpected_info, Msg}, State}. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -terminate(_Reason, State) -> - State. - --spec set_timeout(integer(), fun(() -> any())) -> reference(). -set_timeout(Time, Fun) -> - Key = make_ref(), - set_timeout(Key, Time, Fun). - --spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any(). -set_timeout(Key, Time, Fun) -> - Timeouts = get_timeouts(), - set_timeout(Key, Time, Fun, Timeouts). - --spec clear_timeout(any()) -> ok. -clear_timeout(Key) -> - NewTimeouts = cancel_timeout(Key, get_timeouts()), - put(timeouts, NewTimeouts), - ok. - -get_timeouts() -> - case get(timeouts) of - undefined -> dict:new(); - Dict -> Dict - end. - -set_timeout(Key, Time, Fun, Timeouts) -> - cancel_timeout(Key, Timeouts), - {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}), - NewTimeouts = dict:store(Key, TRef, Timeouts), - put(timeouts, NewTimeouts), - {ok, Key}. - -cancel_timeout(Key, Timeouts) -> - case dict:find(Key, Timeouts) of - {ok, TRef} -> - timer:cancel(TRef), - receive {timeout, Key, _} -> ok - after 0 -> ok - end, - dict:erase(Key, Timeouts); - error -> - Timeouts - end. |
