diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 |
1 files changed, 45 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cf0ef44f5c..64498c3728 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -36,7 +36,7 @@ -behaviour(gen_server2). -define(UNSENT_MESSAGE_LIMIT, 100). --define(HIBERNATE_AFTER, 1000). +-define(HIBERNATE_AFTER_MIN, 1000). -export([start_link/1]). @@ -54,7 +54,10 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + hibernate_after, + hibernated_at + }). -record(consumer, {tag, ack_required}). @@ -101,7 +104,10 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, ?HIBERNATE_AFTER}. + blocked_consumers = queue:new(), + hibernate_after = ?HIBERNATE_AFTER_MIN, + hibernated_at = undefined + }, ?HIBERNATE_AFTER_MIN}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -116,9 +122,41 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. +reply(Reply, NewState = #q { hibernated_at = undefined }) -> + {reply, Reply, NewState, NewState #q.hibernate_after}; +reply(Reply, NewState) -> + NewState1 = adjust_hibernate_after(NewState), + {reply, Reply, NewState1, NewState1 #q.hibernate_after}. + +noreply(NewState = #q { hibernated_at = undefined }) -> + {noreply, NewState, NewState #q.hibernate_after}; +noreply(NewState) -> + NewState1 = adjust_hibernate_after(NewState), + {noreply, NewState1, NewState1 #q.hibernate_after}. + +adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> + State; +adjust_hibernate_after(State = #q { hibernated_at = Then, + hibernate_after = Timeout }) -> + State1 = State #q { hibernated_at = undefined }, + NapLengthMicros = timer:now_diff(now(), Then), + TimeoutMicros = Timeout * 1000, + LowTargetMicros = TimeoutMicros * 4, + HighTargetMicros = LowTargetMicros * 4, + if + NapLengthMicros < LowTargetMicros -> + %% nap was too short, don't go to sleep as soon + State1 #q { hibernate_after = Timeout * 2 }; + + NapLengthMicros > HighTargetMicros -> + %% nap was long, try going to sleep sooner + Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]), + State1 #q { hibernate_after = Timeout1 }; -noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + true -> + %% nap and timeout seem to be in the right relationship. stay here + State1 + end. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -816,7 +854,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]); + State1 = State #q { hibernated_at = now() }, + proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), |
