diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-02 14:49:06 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-02 14:49:06 +0100 |
| commit | 010fc2a0cc744109eef8d6e8be34297518df5d4d (patch) | |
| tree | aa17f2fb051387b19ebefe5cba7b330e71f5a98a | |
| parent | 0d611f1188edc7b8f9da7935e21865fb49163a14 (diff) | |
| download | rabbitmq-server-git-010fc2a0cc744109eef8d6e8be34297518df5d4d.tar.gz | |
Done. In order to keep the code simple, the detection of naptime is done in reply and noreply functions. This means that the now() value there includes computation relating to the last message in. This is maybe not desirable, but the alternative is to wrap all of handle_cast, handle_call and handle_info. Nevertheless, testing shows this works:
in the erlang client:
Conn = amqp_connection:start("guest", "guest", "localhost"),
Chan = lib_amqp:start_channel(Conn),
[begin Q = list_to_binary(integer_to_list(R)), Q = lib_amqp:declare_queue(Chan, Q) end || R <- lists:seq(1,1000)],
Props = (amqp_util:basic_properties()).
[begin Q = list_to_binary(integer_to_list(R)), ok = lib_amqp:publish(Chan, <<"">>, Q, <<0:(8*1024)>>, Props) end || _ <- lists:seq(1,1500), R <- lists:seq(1,1000)].
Then, after that lot's gone in, in a shell do:
watch -n 2 "time ./scripts/rabbitmqctl list_queues | tail"
The times for me start off at about 2.3 seconds, then drop rapidly to 1.4 and then 0.2 seconds and stay there.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 |
1 files changed, 45 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f5c..64498c3728 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). -export([start_link/1]). @@ -54,7 +54,10 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + hibernate_after, + hibernated_at + }). -record(consumer, {tag, ack_required}). @@ -101,7 +104,10 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new(), + hibernate_after = ?HIBERNATE_AFTER_MIN, + hibernated_at = undefined + }, ?HIBERNATE_AFTER_MIN}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +122,41 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState = #q { hibernated_at = undefined }) -> + {reply, Reply, NewState, NewState #q.hibernate_after}; +reply(Reply, NewState) -> + NewState1 = adjust_hibernate_after(NewState), + {reply, Reply, NewState1, NewState1 #q.hibernate_after}. + +noreply(NewState = #q { hibernated_at = undefined }) -> + {noreply, NewState, NewState #q.hibernate_after}; +noreply(NewState) -> + NewState1 = adjust_hibernate_after(NewState), + {noreply, NewState1, NewState1 #q.hibernate_after}. + +adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> + State; +adjust_hibernate_after(State = #q { hibernated_at = Then, + hibernate_after = Timeout }) -> + State1 = State #q { hibernated_at = undefined }, + NapLengthMicros = timer:now_diff(now(), Then), + TimeoutMicros = Timeout * 1000, + LowTargetMicros = TimeoutMicros * 4, + HighTargetMicros = LowTargetMicros * 4, + if + NapLengthMicros < LowTargetMicros -> + %% nap was too short, don't go to sleep as soon + State1 #q { hibernate_after = Timeout * 2 }; + + NapLengthMicros > HighTargetMicros -> + %% nap was long, try going to sleep sooner + Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]), + State1 #q { hibernate_after = Timeout1 }; -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + true -> + %% nap and timeout seem to be in the right relationship. stay here + State1 + end. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -816,7 +854,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); + State1 = State #q { hibernated_at = now() }, + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), |
