summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 22:43:15 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 22:43:15 +0100
commit8028ed34ef6f27f84ed66c2f69a2945bcea6787f (patch)
tree27b0ceaa0aae9f757c08ca44ef845ae689fea5fa
parent66b47b1c017ac6e813aee6310b1dfeee57335bd9 (diff)
downloadrabbitmq-server-git-8028ed34ef6f27f84ed66c2f69a2945bcea6787f.tar.gz
monitor workers
In a way this is a damage limitation exercise... When a worker dies while working it is not in the pool anyway, so noticing its death is a no-op. When a worker dies while idle, it will not return to the pool when we next submit work to it. But obviously the submitted work won't be carried out, and the submitter will get an error (unless they submitted the work asynchronously). All the monitoring does is reduce the likelihood of the latter happening. It cannot eliminate it though since the worker may die just as work was being submitted to it.
-rw-r--r--src/worker_pool.erl14
-rw-r--r--src/worker_pool_worker.erl2
2 files changed, 14 insertions, 2 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index db8c4e9656..b1dba5a233 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,7 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,6 +42,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
+-spec(ready/1 :: (pid()) -> 'ok').
-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -68,6 +69,8 @@ submit(Fun) ->
submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
+ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+
idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
@@ -88,6 +91,10 @@ handle_call({next_free, CPid}, _From, State = #state { available =
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
+handle_cast({ready, WPid}, State) ->
+ erlang:monitor(process, WPid),
+ handle_cast({idle, WPid}, State);
+
handle_cast({idle, WPid}, State = #state { available = Avail,
pending = Pending }) ->
{noreply,
@@ -114,6 +121,11 @@ handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _MRef, process, WPid, _Reason},
+ State = #state { available = Avail }) ->
+ {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
+ hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index ef6f115af4..beb95bc631 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -72,7 +72,7 @@ run(Fun) ->
init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(self()),
+ ok = worker_pool:ready(self()),
put(worker_pool_worker, true),
{ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.