summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2016-12-15 13:22:11 +0000
committerkjnilsson <knilsson@pivotal.io>2016-12-15 13:22:11 +0000
commit93c876aba1184e83811dd39a32b2d17913bc3a20 (patch)
tree39e096366316b8871e7e35a8eb784c8604d02f50
parent3bb8bade8022db0e860f4ab084a02f5b571d0067 (diff)
parent6a18f556dca9b3c28268958568d153e111e4de9d (diff)
downloadrabbitmq-server-git-93c876aba1184e83811dd39a32b2d17913bc3a20.tar.gz
Merge branch 'stable' of github.com:rabbitmq/rabbitmq-server
-rw-r--r--src/worker_pool_worker.erl47
-rw-r--r--test/worker_pool_SUITE.erl193
2 files changed, 240 insertions, 0 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index bd07f0d782..c515abe07f 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -27,6 +27,7 @@
run/1]).
-export([set_maximum_since_use/2]).
+-export([set_timeout/2, set_timeout/3, clear_timeout/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, prioritise_cast/3]).
@@ -136,6 +137,11 @@ handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
{noreply, State, hibernate};
+handle_info({timeout, Key, Fun}, State) ->
+ clear_timeout(Key),
+ Fun(),
+ {noreply, State, hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -144,3 +150,44 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
+
+-spec set_timeout(integer(), fun(() -> any())) -> reference().
+set_timeout(Time, Fun) ->
+ Key = make_ref(),
+ set_timeout(Key, Time, Fun).
+
+-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any().
+set_timeout(Key, Time, Fun) ->
+ Timeouts = get_timeouts(),
+ set_timeout(Key, Time, Fun, Timeouts).
+
+-spec clear_timeout(any()) -> ok.
+clear_timeout(Key) ->
+ NewTimeouts = cancel_timeout(Key, get_timeouts()),
+ put(timeouts, NewTimeouts),
+ ok.
+
+get_timeouts() ->
+ case get(timeouts) of
+ undefined -> dict:new();
+ Dict -> Dict
+ end.
+
+set_timeout(Key, Time, Fun, Timeouts) ->
+ cancel_timeout(Key, Timeouts),
+ {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}),
+ NewTimeouts = dict:store(Key, TRef, Timeouts),
+ put(timeouts, NewTimeouts),
+ {ok, Key}.
+
+cancel_timeout(Key, Timeouts) ->
+ case dict:find(Key, Timeouts) of
+ {ok, TRef} ->
+ timer:cancel(TRef),
+ receive {timeout, Key, _} -> ok
+ after 0 -> ok
+ end,
+ dict:erase(Key, Timeouts);
+ error ->
+ Timeouts
+ end.
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
new file mode 100644
index 0000000000..7eb4d6fd04
--- /dev/null
+++ b/test/worker_pool_SUITE.erl
@@ -0,0 +1,193 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(worker_pool_SUITE).
+
+-compile(export_all).
+-include_lib("common_test/include/ct.hrl").
+
+
+-define(POOL_SIZE, 1).
+-define(POOL_NAME, test_pool).
+
+all() ->
+ [
+ run_code_synchronously,
+ run_code_asynchronously,
+ set_timeout,
+ cancel_timeout,
+ cancel_timeout_by_setting
+ ].
+
+init_per_testcase(_, Config) ->
+ {ok, Pool} = worker_pool_sup:start_link(?POOL_SIZE, ?POOL_NAME),
+ rabbit_ct_helpers:set_config(Config, [{pool_sup, Pool}]).
+
+end_per_testcase(_, Config) ->
+ Pool = ?config(pool_sup, Config),
+ unlink(Pool),
+ exit(Pool, kill).
+
+run_code_synchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end,
+ reuse)
+ end),
+ % Worker run synchronously
+ true = Time > Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after 0 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+run_code_asynchronously(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Sleep = 200,
+ {Time, Result} = timer:tc(fun() ->
+ worker_pool:submit_async(?POOL_NAME,
+ fun() ->
+ timer:sleep(Sleep),
+ Self ! {hi, Test},
+ self()
+ end)
+ end),
+ % Worker run synchronously
+ true = Time < Sleep,
+ % Worker have sent message
+ receive {hi, Test} -> ok
+ after Sleep + 100 -> error(no_message_from_worker)
+ end,
+ % Worker is a separate process
+ true = (Self /= Result).
+
+set_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ timer:sleep(1000),
+
+ receive {hello, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+cancel_timeout(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:clear_timeout(my_timeout),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end.
+
+cancel_timeout_by_setting(Config) ->
+ Self = self(),
+ Test = make_ref(),
+ Worker = worker_pool:submit(?POOL_NAME,
+ fun() ->
+ Worker = self(),
+ timer:sleep(100),
+ worker_pool_worker:set_timeout(
+ my_timeout, 1000,
+ fun() ->
+ Self ! {hello, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ % Timeout will occur after 1000 ms only
+ receive {hello, Worker, Test} -> exit(timeout_should_wait)
+ after 0 -> ok
+ end,
+
+ worker_pool_worker:next_job_from(Worker, Self),
+ Worker = worker_pool_worker:submit(Worker,
+ fun() ->
+ worker_pool_worker:set_timeout(my_timeout, 1000,
+ fun() ->
+ Self ! {hello_reset, self(), Test}
+ end),
+ Worker
+ end,
+ reuse),
+
+ timer:sleep(1000),
+ receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ after 0 -> ok
+ end,
+
+ receive {hello_reset, Worker, Test} -> ok
+ after 1000 -> exit(timeout_is_late)
+ end.
+
+
+
+
+