summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-02 16:37:05 +0200
committerJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2017-06-26 13:30:46 +0200
commitb634a84851505535d9e8ecb7eb96584dfa371f58 (patch)
treec979b0609accbfecdc6a265bcfd92773a4a48423 /src
parent7d470b84f004cbcddefa960df0c65bbcb95d9a83 (diff)
downloadrabbitmq-server-git-b634a84851505535d9e8ecb7eb96584dfa371f58.tar.gz
worker_pool: Move to rabbitmq-common
Thi resolves a dependency of rabbitmq-common on rabbitmq-server. [#118490793]
Diffstat (limited to 'src')
-rw-r--r--src/worker_pool.erl172
-rw-r--r--src/worker_pool_sup.erl56
-rw-r--r--src/worker_pool_worker.erl193
3 files changed, 0 insertions, 421 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
deleted file mode 100644
index 71ed2a359a..0000000000
--- a/src/worker_pool.erl
+++ /dev/null
@@ -1,172 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(worker_pool).
-
-%% Generic worker pool manager.
-%%
-%% Submitted jobs are functions. They can be executed synchronously
-%% (using worker_pool:submit/1, worker_pool:submit/2) or asynchronously
-%% (using worker_pool:submit_async/1).
-%%
-%% We typically use the worker pool if we want to limit the maximum
-%% parallelism of some job. We are not trying to dodge the cost of
-%% creating Erlang processes.
-%%
-%% Supports nested submission of jobs and two execution modes:
-%% 'single' and 'reuse'. Jobs executed in 'single' mode are invoked in
-%% a one-off process. Those executed in 'reuse' mode are invoked in a
-%% worker process out of the pool. Nested jobs are always executed
-%% immediately in current worker process.
-%%
-%% 'single' mode is offered to work around a bug in Mnesia: after
-%% network partitions reply messages for prior failed requests can be
-%% sent to Mnesia clients - a reused worker pool process can crash on
-%% receiving one.
-%%
-%% Caller submissions are enqueued internally. When the next worker
-%% process is available, it communicates it to the pool and is
-%% assigned a job to execute. If job execution fails with an error, no
-%% response is returned to the caller.
-%%
-%% Worker processes prioritise certain command-and-control messages
-%% from the pool.
-%%
-%% Future improvement points: job prioritisation.
-
--behaviour(gen_server2).
-
--export([start_link/1,
- submit/1, submit/2, submit/3,
- submit_async/1, submit_async/2,
- ready/2,
- idle/2,
- default_pool/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
-%%----------------------------------------------------------------------------
-
--type mfargs() :: {atom(), atom(), [any()]}.
-
--spec start_link(atom()) -> {'ok', pid()} | {'error', any()}.
--spec submit(fun (() -> A) | mfargs()) -> A.
--spec submit(fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
--spec submit(atom(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
--spec submit_async(fun (() -> any()) | mfargs()) -> 'ok'.
--spec ready(atom(), pid()) -> 'ok'.
--spec idle(atom(), pid()) -> 'ok'.
--spec default_pool() -> atom().
-
-%%----------------------------------------------------------------------------
-
--define(DEFAULT_POOL, ?MODULE).
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
--record(state, { available, pending }).
-
-%%----------------------------------------------------------------------------
-
-start_link(Name) -> gen_server2:start_link({local, Name}, ?MODULE, [],
- [{timeout, infinity}]).
-
-submit(Fun) ->
- submit(?DEFAULT_POOL, Fun, reuse).
-
-%% ProcessModel =:= single is for working around the mnesia_locker bug.
-submit(Fun, ProcessModel) ->
- submit(?DEFAULT_POOL, Fun, ProcessModel).
-
-submit(Server, Fun, ProcessModel) ->
- case get(worker_pool_worker) of
- true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(Server, {next_free, self()}, infinity),
- worker_pool_worker:submit(Pid, Fun, ProcessModel)
- end.
-
-submit_async(Fun) -> submit_async(?DEFAULT_POOL, Fun).
-
-submit_async(Server, Fun) -> gen_server2:cast(Server, {run_async, Fun}).
-
-ready(Server, WPid) -> gen_server2:cast(Server, {ready, WPid}).
-
-idle(Server, WPid) -> gen_server2:cast(Server, {idle, WPid}).
-
-default_pool() -> ?DEFAULT_POOL.
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-handle_call({next_free, CPid}, From, State = #state { available = [],
- pending = Pending }) ->
- {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
-handle_call({next_free, CPid}, _From, State = #state { available =
- [WPid | Avail1] }) ->
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 }, hibernate};
-
-handle_call(Msg, _From, State) ->
- {stop, {unexpected_call, Msg}, State}.
-
-handle_cast({ready, WPid}, State) ->
- erlang:monitor(process, WPid),
- handle_cast({idle, WPid}, State);
-
-handle_cast({idle, WPid}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply,
- case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = ordsets:add_element(WPid, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(WPid, Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = [],
- pending = Pending }) ->
- {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
- hibernate};
-handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
- worker_pool_worker:submit_async(WPid, Fun),
- {noreply, State #state { available = Avail1 }, hibernate};
-
-handle_cast(Msg, State) ->
- {stop, {unexpected_cast, Msg}, State}.
-
-handle_info({'DOWN', _MRef, process, WPid, _Reason},
- State = #state { available = Avail }) ->
- {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
- hibernate};
-
-handle_info(Msg, State) ->
- {stop, {unexpected_info, Msg}, State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-terminate(_Reason, State) ->
- State.
diff --git a/src/worker_pool_sup.erl b/src/worker_pool_sup.erl
deleted file mode 100644
index 2608f5c2e6..0000000000
--- a/src/worker_pool_sup.erl
+++ /dev/null
@@ -1,56 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(worker_pool_sup).
-
--behaviour(supervisor).
-
--export([start_link/0, start_link/1, start_link/2]).
-
--export([init/1]).
-
-%%----------------------------------------------------------------------------
-
--spec start_link() -> rabbit_types:ok_pid_or_error().
--spec start_link(non_neg_integer()) -> rabbit_types:ok_pid_or_error().
--spec start_link(non_neg_integer(), atom())
- -> rabbit_types:ok_pid_or_error().
-
-%%----------------------------------------------------------------------------
-
-start_link() ->
- start_link(erlang:system_info(schedulers)).
-
-start_link(WCount) ->
- start_link(WCount, worker_pool:default_pool()).
-
-start_link(WCount, PoolName) ->
- SupName = list_to_atom(atom_to_list(PoolName) ++ "_sup"),
- supervisor:start_link({local, SupName}, ?MODULE, [WCount, PoolName]).
-
-%%----------------------------------------------------------------------------
-
-init([WCount, PoolName]) ->
- %% we want to survive up to 1K of worker restarts per second,
- %% e.g. when a large worker pool used for network connections
- %% encounters a network failure. This is the case in the LDAP authentication
- %% backend plugin.
- {ok, {{one_for_one, 1000, 1},
- [{worker_pool, {worker_pool, start_link, [PoolName]}, transient,
- 16#ffffffff, worker, [worker_pool]} |
- [{N, {worker_pool_worker, start_link, [PoolName]}, transient,
- 16#ffffffff, worker, [worker_pool_worker]}
- || N <- lists:seq(1, WCount)]]}}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
deleted file mode 100644
index 23db127def..0000000000
--- a/src/worker_pool_worker.erl
+++ /dev/null
@@ -1,193 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(worker_pool_worker).
-
-%% Executes jobs (functions) submitted to a worker pool with worker_pool:submit/1,
-%% worker_pool:submit/2 or worker_pool:submit_async/1.
-%%
-%% See worker_pool for an overview.
-
--behaviour(gen_server2).
-
--export([start_link/1, next_job_from/2, submit/3, submit_async/2,
- run/1]).
-
--export([set_maximum_since_use/2]).
--export([set_timeout/2, set_timeout/3, clear_timeout/1]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3, prioritise_cast/3]).
-
-%%----------------------------------------------------------------------------
-
--type mfargs() :: {atom(), atom(), [any()]}.
-
--spec start_link(atom) -> {'ok', pid()} | {'error', any()}.
--spec next_job_from(pid(), pid()) -> 'ok'.
--spec submit(pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A.
--spec submit_async(pid(), fun (() -> any()) | mfargs()) -> 'ok'.
--spec run(fun (() -> A)) -> A; (mfargs()) -> any().
--spec set_maximum_since_use(pid(), non_neg_integer()) -> 'ok'.
-
-%%----------------------------------------------------------------------------
-
--define(HIBERNATE_AFTER_MIN, 1000).
--define(DESIRED_HIBERNATE, 10000).
-
-%%----------------------------------------------------------------------------
-
-start_link(PoolName) ->
- gen_server2:start_link(?MODULE, [PoolName], [{timeout, infinity}]).
-
-next_job_from(Pid, CPid) ->
- gen_server2:cast(Pid, {next_job_from, CPid}).
-
-submit(Pid, Fun, ProcessModel) ->
- gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity).
-
-submit_async(Pid, Fun) ->
- gen_server2:cast(Pid, {submit_async, Fun}).
-
-set_maximum_since_use(Pid, Age) ->
- gen_server2:cast(Pid, {set_maximum_since_use, Age}).
-
-run({M, F, A}) -> apply(M, F, A);
-run(Fun) -> Fun().
-
-run(Fun, reuse) ->
- run(Fun);
-run(Fun, single) ->
- Self = self(),
- Ref = make_ref(),
- spawn_link(fun () ->
- put(worker_pool_worker, true),
- Self ! {Ref, run(Fun)},
- unlink(Self)
- end),
- receive
- {Ref, Res} -> Res
- end.
-
-%%----------------------------------------------------------------------------
-
-init([PoolName]) ->
- ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
- [self()]),
- ok = worker_pool:ready(PoolName, self()),
- put(worker_pool_worker, true),
- put(worker_pool_name, PoolName),
- {ok, undefined, hibernate,
- {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-
-prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
-prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
-prioritise_cast(_Msg, _Len, _State) -> 0.
-
-handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
- {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate};
-
-handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
- erlang:demonitor(MRef),
- gen_server2:reply(From, run(Fun, ProcessModel)),
- ok = worker_pool:idle(get(worker_pool_name), self()),
- {noreply, undefined, hibernate};
-
-handle_call(Msg, _From, State) ->
- {stop, {unexpected_call, Msg}, State}.
-
-handle_cast({next_job_from, CPid}, undefined) ->
- MRef = erlang:monitor(process, CPid),
- {noreply, {from, CPid, MRef}, hibernate};
-
-handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
- gen_server2:reply(From, run(Fun, ProcessModel)),
- ok = worker_pool:idle(get(worker_pool_name), self()),
- {noreply, undefined, hibernate};
-
-handle_cast({submit_async, Fun}, undefined) ->
- run(Fun),
- ok = worker_pool:idle(get(worker_pool_name), self()),
- {noreply, undefined, hibernate};
-
-handle_cast({set_maximum_since_use, Age}, State) ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- {noreply, State, hibernate};
-
-handle_cast(Msg, State) ->
- {stop, {unexpected_cast, Msg}, State}.
-
-handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
- ok = worker_pool:idle(get(worker_pool_name), self()),
- {noreply, undefined, hibernate};
-
-handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
- {noreply, State, hibernate};
-
-handle_info({timeout, Key, Fun}, State) ->
- clear_timeout(Key),
- Fun(),
- {noreply, State, hibernate};
-
-handle_info(Msg, State) ->
- {stop, {unexpected_info, Msg}, State}.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-terminate(_Reason, State) ->
- State.
-
--spec set_timeout(integer(), fun(() -> any())) -> reference().
-set_timeout(Time, Fun) ->
- Key = make_ref(),
- set_timeout(Key, Time, Fun).
-
--spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any().
-set_timeout(Key, Time, Fun) ->
- Timeouts = get_timeouts(),
- set_timeout(Key, Time, Fun, Timeouts).
-
--spec clear_timeout(any()) -> ok.
-clear_timeout(Key) ->
- NewTimeouts = cancel_timeout(Key, get_timeouts()),
- put(timeouts, NewTimeouts),
- ok.
-
-get_timeouts() ->
- case get(timeouts) of
- undefined -> dict:new();
- Dict -> Dict
- end.
-
-set_timeout(Key, Time, Fun, Timeouts) ->
- cancel_timeout(Key, Timeouts),
- {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}),
- NewTimeouts = dict:store(Key, TRef, Timeouts),
- put(timeouts, NewTimeouts),
- {ok, Key}.
-
-cancel_timeout(Key, Timeouts) ->
- case dict:find(Key, Timeouts) of
- {ok, TRef} ->
- timer:cancel(TRef),
- receive {timeout, Key, _} -> ok
- after 0 -> ok
- end,
- dict:erase(Key, Timeouts);
- error ->
- Timeouts
- end.