summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:14:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:14:00 +0100
commita6439b55f8611dec9c0c9fab4e543f206a6e0360 (patch)
treef05aa6e0ce9419c6b7967be9af6631b35361bff2
parentf4d79ac0137ae0a8c82f8aaf275636eabd33c9d9 (diff)
downloadrabbitmq-server-git-a6439b55f8611dec9c0c9fab4e543f206a6e0360.tar.gz
record workers in a set
...instead of a queue. That way when an idle worker is restarted (and sends an 'idle' message to the pool), it won't end up in the pool twice. Note that we always hand out work to the first worker in the ordset. That is actually more efficient than the round-robin strategy we had with the queue since it keeps a smaller number of workers busy while others can hibernate.
-rw-r--r--src/worker_pool.erl64
1 files changed, 29 insertions, 35 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 0f265e2223..268c703ffa 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -76,52 +76,46 @@ idle(WId) ->
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({next_free, CPid}, From, State = #state { available = Avail,
- pending = Pending }) ->
- case queue:out(Avail) of
- {empty, _Avail} ->
- {noreply,
- State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
- {{value, WId}, Avail1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 },
- hibernate}
- end;
+handle_call({next_free, CPid}, From, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
+ hibernate};
+handle_call({next_free, CPid}, _From, State = #state { available =
+ [WId | Avail1] }) ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = Avail,
- pending = Pending }) ->
+ pending = Pending }) ->
{noreply,
- case queue:out(Avail) of
- {empty, _Avail} ->
- State #state { pending = queue:in({run_async, Fun}, Pending)};
- {{value, WId}, Avail1} ->
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = ordsets:add_element(WId, Avail) };
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { available = Avail1 }
+ State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
+ hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [WId | Avail1] }) ->
+ worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
+ {noreply, State #state { available = Avail1 }, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.