summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-02 14:49:06 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-02 14:49:06 +0100
commit010fc2a0cc744109eef8d6e8be34297518df5d4d (patch)
treeaa17f2fb051387b19ebefe5cba7b330e71f5a98a
parent0d611f1188edc7b8f9da7935e21865fb49163a14 (diff)
downloadrabbitmq-server-git-010fc2a0cc744109eef8d6e8be34297518df5d4d.tar.gz
Done. In order to keep the code simple, the detection of naptime is done in reply and noreply functions. This means that the now() value there includes computation relating to the last message in. This is maybe not desirable, but the alternative is to wrap all of handle_cast, handle_call and handle_info. Nevertheless, testing shows this works:
in the erlang client: Conn = amqp_connection:start("guest", "guest", "localhost"), Chan = lib_amqp:start_channel(Conn), [begin Q = list_to_binary(integer_to_list(R)), Q = lib_amqp:declare_queue(Chan, Q) end || R <- lists:seq(1,1000)], Props = (amqp_util:basic_properties()). [begin Q = list_to_binary(integer_to_list(R)), ok = lib_amqp:publish(Chan, <<"">>, Q, <<0:(8*1024)>>, Props) end || _ <- lists:seq(1,1500), R <- lists:seq(1,1000)]. Then, after that lot's gone in, in a shell do: watch -n 2 "time ./scripts/rabbitmqctl list_queues | tail" The times for me start off at about 2.3 seconds, then drop rapidly to 1.4 and then 0.2 seconds and stay there.
-rw-r--r--src/rabbit_amqqueue_process.erl51
1 files changed, 45 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f5c..64498c3728 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
-export([start_link/1]).
@@ -54,7 +54,10 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ hibernate_after,
+ hibernated_at
+ }).
-record(consumer, {tag, ack_required}).
@@ -101,7 +104,10 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new(),
+ hibernate_after = ?HIBERNATE_AFTER_MIN,
+ hibernated_at = undefined
+ }, ?HIBERNATE_AFTER_MIN}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -116,9 +122,41 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState = #q { hibernated_at = undefined }) ->
+ {reply, Reply, NewState, NewState #q.hibernate_after};
+reply(Reply, NewState) ->
+ NewState1 = adjust_hibernate_after(NewState),
+ {reply, Reply, NewState1, NewState1 #q.hibernate_after}.
+
+noreply(NewState = #q { hibernated_at = undefined }) ->
+ {noreply, NewState, NewState #q.hibernate_after};
+noreply(NewState) ->
+ NewState1 = adjust_hibernate_after(NewState),
+ {noreply, NewState1, NewState1 #q.hibernate_after}.
+
+adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
+ State;
+adjust_hibernate_after(State = #q { hibernated_at = Then,
+ hibernate_after = Timeout }) ->
+ State1 = State #q { hibernated_at = undefined },
+ NapLengthMicros = timer:now_diff(now(), Then),
+ TimeoutMicros = Timeout * 1000,
+ LowTargetMicros = TimeoutMicros * 4,
+ HighTargetMicros = LowTargetMicros * 4,
+ if
+ NapLengthMicros < LowTargetMicros ->
+ %% nap was too short, don't go to sleep as soon
+ State1 #q { hibernate_after = Timeout * 2 };
+
+ NapLengthMicros > HighTargetMicros ->
+ %% nap was long, try going to sleep sooner
+ Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]),
+ State1 #q { hibernate_after = Timeout1 };
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+ true ->
+ %% nap and timeout seem to be in the right relationship. stay here
+ State1
+ end.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -816,7 +854,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
+ State1 = State #q { hibernated_at = now() },
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),