diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2016-09-27 15:51:38 +0100 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2016-12-02 10:03:12 +0000 |
| commit | 28b843664ad2ebbb580a5066c3236877d9e25cf5 (patch) | |
| tree | b7e16b09ef097e70d6821e2d8a0aaf54ba1221eb /src | |
| parent | 438bb3b260f9b45a1115e0bb07547ecd079d52a4 (diff) | |
| download | rabbitmq-server-git-28b843664ad2ebbb580a5066c3236877d9e25cf5.tar.gz | |
Timeouts for worker pool worker
Diffstat (limited to 'src')
| -rw-r--r-- | src/worker_pool_worker.erl | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index bd07f0d782..a342a5eb61 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -27,6 +27,7 @@ run/1]). -export([set_maximum_since_use/2]). +-export([set_timeout/2, set_timeout/3, clear_timeout/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). @@ -136,6 +137,11 @@ handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> {noreply, State, hibernate}; +handle_info({timeout, Key, Fun}, State) -> + clear_timeout(Key), + Fun(), + {notrply, State, hibernate}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. @@ -144,3 +150,44 @@ code_change(_OldVsn, State, _Extra) -> terminate(_Reason, State) -> State. + +-spec set_timeout(integer(), fun(() -> any())) -> ref(). +set_timeout(Time, Fun) -> + Key = make_ref(), + set_timeout(Key, Time, Fun). + +-spec set_timeout(Key, integer(), fun(() -> any())) -> Key when Key :: any(). +set_timeout(Key, Time, Fun) -> + Timeouts = get_timeouts(), + set_timeout(Key, Time, Fun, Timeouts). + +-spec clear_timeout(any()) -> ok. +clear_timeout(Key) -> + NewTimeouts = cancel_timeout(Key, get_timeouts()), + put(timeouts, NewTimeouts), + ok. + +get_timeouts() -> + case get(timeouts) of + undefined -> dict:new(); + Dict -> Dict + end. + +set_timeout(Key, Time, Fun, Timeouts) -> + cancel_timeout(Key, Timeouts), + {ok, TRef} = timer:send_after(Time, {timeout, Key, Fun}), + NewTimeouts = dict:store(Key, TRef, Timeouts), + put(timeouts, NewTimeouts), + {ok, Key}. + +cancel_timeout(Key, Timeouts) -> + case dict:find(Key, Timeouts) of + {ok, TRef} -> + timer:cancel(TRef), + receive {timeout, Key, _} -> ok + after 0 -> ok + end, + dict:erase(Key, Timeouts); + error -> + Timeouts + end. |
