summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl51
1 files changed, 45 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cf0ef44f5c..64498c3728 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -36,7 +36,7 @@
-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
--define(HIBERNATE_AFTER, 1000).
+-define(HIBERNATE_AFTER_MIN, 1000).
-export([start_link/1]).
@@ -54,7 +54,10 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ hibernate_after,
+ hibernated_at
+ }).
-record(consumer, {tag, ack_required}).
@@ -101,7 +104,10 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}.
+ blocked_consumers = queue:new(),
+ hibernate_after = ?HIBERNATE_AFTER_MIN,
+ hibernated_at = undefined
+ }, ?HIBERNATE_AFTER_MIN}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -116,9 +122,41 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}.
+reply(Reply, NewState = #q { hibernated_at = undefined }) ->
+ {reply, Reply, NewState, NewState #q.hibernate_after};
+reply(Reply, NewState) ->
+ NewState1 = adjust_hibernate_after(NewState),
+ {reply, Reply, NewState1, NewState1 #q.hibernate_after}.
+
+noreply(NewState = #q { hibernated_at = undefined }) ->
+ {noreply, NewState, NewState #q.hibernate_after};
+noreply(NewState) ->
+ NewState1 = adjust_hibernate_after(NewState),
+ {noreply, NewState1, NewState1 #q.hibernate_after}.
+
+adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
+ State;
+adjust_hibernate_after(State = #q { hibernated_at = Then,
+ hibernate_after = Timeout }) ->
+ State1 = State #q { hibernated_at = undefined },
+ NapLengthMicros = timer:now_diff(now(), Then),
+ TimeoutMicros = Timeout * 1000,
+ LowTargetMicros = TimeoutMicros * 4,
+ HighTargetMicros = LowTargetMicros * 4,
+ if
+ NapLengthMicros < LowTargetMicros ->
+ %% nap was too short, don't go to sleep as soon
+ State1 #q { hibernate_after = Timeout * 2 };
+
+ NapLengthMicros > HighTargetMicros ->
+ %% nap was long, try going to sleep sooner
+ Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]),
+ State1 #q { hibernate_after = Timeout1 };
-noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}.
+ true ->
+ %% nap and timeout seem to be in the right relationship. stay here
+ State1
+ end.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -816,7 +854,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
+ State1 = State #q { hibernated_at = now() },
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),