diff options
| -rw-r--r-- | src/worker_pool_worker.erl | 47 | ||||
| -rw-r--r-- | test/worker_pool_SUITE.erl | 193 |
2 files changed, 240 insertions, 0 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index bd07f0d782..c515abe07f 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -27,6 +27,7 @@ run/1]). -export([set_maximum_since_use/2]). +-export([set_timeout/2, set_timeout/3, clear_timeout/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). @@ -136,6 +137,11 @@ handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> {noreply, State, hibernate}; +handle_info({timeout, Key, Fun}, State) -> + clear_timeout(Key), + Fun(), + {noreply, State, hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -144,3 +150,44 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. + +-spec set_timeout(integer(), fun(() -> any())) -> reference(). +set_timeout(Time, Fun) -> + Key = make_ref(), + set_timeout(Key, Time, Fun). + +-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any(). +set_timeout(Key, Time, Fun) -> + Timeouts = get_timeouts(), + set_timeout(Key, Time, Fun, Timeouts). + +-spec clear_timeout(any()) -> ok. +clear_timeout(Key) -> + NewTimeouts = cancel_timeout(Key, get_timeouts()), + put(timeouts, NewTimeouts), + ok. + +get_timeouts() -> + case get(timeouts) of + undefined -> dict:new(); + Dict -> Dict + end. + +set_timeout(Key, Time, Fun, Timeouts) -> + cancel_timeout(Key, Timeouts), + {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}), + NewTimeouts = dict:store(Key, TRef, Timeouts), + put(timeouts, NewTimeouts), + {ok, Key}. + +cancel_timeout(Key, Timeouts) -> + case dict:find(Key, Timeouts) of + {ok, TRef} -> + timer:cancel(TRef), + receive {timeout, Key, _} -> ok + after 0 -> ok + end, + dict:erase(Key, Timeouts); + error -> + Timeouts + end. diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl new file mode 100644 index 0000000000..7eb4d6fd04 --- /dev/null +++ b/test/worker_pool_SUITE.erl @@ -0,0 +1,193 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved. +%% + +-module(worker_pool_SUITE). + +-compile(export_all). +-include_lib("common_test/include/ct.hrl"). + + +-define(POOL_SIZE, 1). +-define(POOL_NAME, test_pool). + +all() -> + [ + run_code_synchronously, + run_code_asynchronously, + set_timeout, + cancel_timeout, + cancel_timeout_by_setting + ]. + +init_per_testcase(_, Config) -> + {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME), + rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]). + +end_per_testcase(_, Config) -> + Pool = ?config(pool_sup, Config), + unlink(Pool), + exit(Pool, kill). + +run_code_synchronously(Config) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end, + reuse) + end), + % Worker run synchronously + true = Time > Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after 0 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +run_code_asynchronously(Config) -> + Self = self(), + Test = make_ref(), + Sleep = 200, + {Time, Result} = timer:tc(fun() -> + worker_pool:submit_async(?POOL_NAME, + fun() -> + timer:sleep(Sleep), + Self ! {hi, Test}, + self() + end) + end), + % Worker run synchronously + true = Time < Sleep, + % Worker have sent message + receive {hi, Test} -> ok + after Sleep + 100 -> error(no_message_from_worker) + end, + % Worker is a separate process + true = (Self /= Result). + +set_timeout(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + timer:sleep(1000), + + receive {hello, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + + +cancel_timeout(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:clear_timeout(my_timeout), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + after 0 -> ok + end. + +cancel_timeout_by_setting(Config) -> + Self = self(), + Test = make_ref(), + Worker = worker_pool:submit(?POOL_NAME, + fun() -> + Worker = self(), + timer:sleep(100), + worker_pool_worker:set_timeout( + my_timeout, 1000, + fun() -> + Self ! {hello, self(), Test} + end), + Worker + end, + reuse), + + % Timeout will occur after 1000 ms only + receive {hello, Worker, Test} -> exit(timeout_should_wait) + after 0 -> ok + end, + + worker_pool_worker:next_job_from(Worker, Self), + Worker = worker_pool_worker:submit(Worker, + fun() -> + worker_pool_worker:set_timeout(my_timeout, 1000, + fun() -> + Self ! {hello_reset, self(), Test} + end), + Worker + end, + reuse), + + timer:sleep(1000), + receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + after 0 -> ok + end, + + receive {hello_reset, Worker, Test} -> ok + after 1000 -> exit(timeout_is_late) + end. + + + + + |
