summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-03 17:42:02 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-03 17:42:02 +0100
commit519e73a0ad74c25ca8169322a1cfe5f42b2f7ab9 (patch)
tree5db18b7cc7769453fcd30a51bc6cce5cbf7ef042
parentfd792ab2ad5126425dbc23774218db547542fecb (diff)
parent091b70039aee2701a42b7e19ce1e48889638bb79 (diff)
downloadrabbitmq-server-git-519e73a0ad74c25ca8169322a1cfe5f42b2f7ab9.tar.gz
merge in from 21087
-rw-r--r--src/gen_server2.erl156
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_disk_queue.erl5
-rw-r--r--src/rabbit_queue_prefetcher.erl12
4 files changed, 102 insertions, 79 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index be2c5730d6..529ed0295e 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -345,7 +345,8 @@ enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity, undefined).
enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
- enter_loop(Mod, Options, State, self(), infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, self(), infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity, undefined);
@@ -354,7 +355,8 @@ enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout, undefined).
enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
@@ -392,7 +394,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
{ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -430,6 +433,12 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
+extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
+ Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1),
+ Post = erlang:function_exported(Mod, handle_post_hibernate, 1),
+ random:seed(now()), %% call before we get into the loop
+ {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
@@ -440,25 +449,36 @@ loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, State, Mod, undefined, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
receive
- Input -> loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue), Debug)
- after 0 ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- Queue, Debug, false)
+ Input -> drain(in(Input, Queue))
+ after 0 -> Queue
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Hib) ->
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, Msg);
+ Time, TimeoutState, Queue1, Debug, Msg);
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
- {hibernate, {backoff, Current, _Min, _Desired}} ->
+ {hibernate,
+ {backoff, Current, _Min, _Desired, _Pre, _Post}} ->
{Current, true};
+ {hibernate, _} ->
+ %% wake_hib/7 will set Time to hibernate. If
+ %% we were woken and didn't receive a msg
+ %% then we will get here and need a sensible
+ %% value for Time1, otherwise we crash.
+ %% On the grounds that it's better to get
+ %% control back to the user module sooner
+ %% rather than later, 0 is more sensible
+ %% than infinity here.
+ {0, false};
_ -> {Time, false}
end,
receive
@@ -474,72 +494,82 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
false ->
process_msg(
Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, Hib, timeout)
+ Queue1, Debug, timeout)
end
end
end.
wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- Msg = receive
- Input ->
- Input
- end,
process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
- in(Msg, Queue), Debug, true).
+ drain(Queue), Debug).
wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
- AwokeAt = now(),
- Msg = receive
- Input ->
- Input
- end,
- backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- TimeoutState, in(Msg, Queue), Debug).
-
-backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- case catch Mod:handle_pre_hibernate(State) of
- {hibernate, NState} ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod,
- now(), TimeoutState,
- Queue, Debug]);
- {stop, Reason, NState} ->
- terminate(Reason, Name, pre_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, pre_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
- State, [])
+ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(),
+ TimeoutState, drain(Queue), Debug).
+
+backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
+ {backoff, _Current, _Minimum, _Desired, Pre, _Post},
+ Queue, Debug) ->
+ case Pre of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState,
+ Mod, now(),
+ TimeoutState, Queue,
+ Debug]);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, pre_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, pre_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
+ State, [])
+ end;
+ false ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ now(), TimeoutState, Queue,
+ Debug])
end.
backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- {backoff, CurrentTO, MinimumTO, DesiredHibPeriod},
+ {backoff, CurrentTO, MinimumTO, DesiredHibPeriod,
+ Pre, Post},
Queue, Debug) ->
NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
DesiredHibMicros = DesiredHibPeriod * 1000,
- CurrentTO1 = case (NapLengthMicros + CurrentMicros) >
- (MinimumMicros + DesiredHibMicros) of
- true ->
- lists:max([MinimumTO, round(CurrentTO/2)]);
- false ->
- CurrentTO + MinimumTO
- end,
- TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod},
- case catch Mod:handle_post_hibernate(State) of
- {noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState,
- Queue, Debug, true);
- {noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState,
- Queue, Debug, true);
- {stop, Reason, NState} ->
- terminate(Reason, Name, post_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, post_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, post_hibernate, Mod,
- State, [])
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ CurrentTO1 = Base + random:uniform(Base),
+ TimeoutState =
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post},
+ case Post of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState,
+ Queue, Debug);
+ {noreply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue,
+ Debug);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, post_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, post_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, post_hibernate,
+ Mod, State, [])
+ end;
+ false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue,
+ Debug)
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
@@ -550,7 +580,7 @@ in(Input, Queue) ->
priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, _Hib, Msg) ->
+ Debug, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a0e6b1c69d..dba7ec2406 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -43,8 +43,7 @@
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
- handle_info/2]).
--export([handle_pre_hibernate/1, handle_post_hibernate/1]).
+ handle_info/2, handle_pre_hibernate/1]).
-import(queue).
-import(erlang).
@@ -842,7 +841,7 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) ->
handle_cast(report_memory, State) ->
%% deliberately don't call noreply/2 as we don't want to restart the timer
%% by unsetting the timer, we force a report on the next normal message
- {noreply, State #q { memory_report_timer = undefined }, binary}.
+ {noreply, State #q { memory_report_timer = undefined }, hibernate}.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -870,6 +869,3 @@ handle_pre_hibernate(State = #q { mixed_state = MS }) ->
stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })),
%% don't call noreply/1 as that'll restart the memory_report_timer
{hibernate, State1}.
-
-handle_post_hibernate(State) ->
- {noreply, State, hibernate}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 154e8a9054..fe8c433c7b 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -37,7 +37,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([handle_pre_hibernate/1, handle_post_hibernate/1]).
+-export([handle_pre_hibernate/1]).
-export([publish/3, deliver/1, phantom_deliver/1, ack/2,
tx_publish/1, tx_commit/3, tx_cancel/1,
@@ -569,9 +569,6 @@ handle_pre_hibernate(State) ->
ok = report_memory(true, State),
{hibernate, stop_memory_timer(State)}.
-handle_post_hibernate(State) ->
- noreply(State).
-
terminate(_Reason, State) ->
shutdown(State).
diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl
index dfd444b259..0265ba2bd2 100644
--- a/src/rabbit_queue_prefetcher.erl
+++ b/src/rabbit_queue_prefetcher.erl
@@ -43,6 +43,7 @@
-include("rabbit.hrl").
-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-record(pstate,
{ msg_buf,
@@ -209,7 +210,8 @@ init([Q, Count, QPid]) ->
queue_mref = MRef
},
ok = rabbit_disk_queue:prefetch(Q),
- {ok, State, {binary, ?HIBERNATE_AFTER_MIN}}.
+ {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call(drain, _From, State = #pstate { buf_length = 0 }) ->
{stop, normal, empty, State};
@@ -221,7 +223,7 @@ handle_call(drain, _From, State = #pstate { fetched_count = Count,
handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf,
buf_length = Length }) ->
{reply, {MsgBuf, Length, continuing},
- State #pstate { msg_buf = queue:new(), buf_length = 0 }};
+ State #pstate { msg_buf = queue:new(), buf_length = 0 }, hibernate};
handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) ->
{stop, normal, empty, State};
handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
@@ -231,7 +233,7 @@ handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf,
handle_cast(publish_empty, State) ->
%% Very odd. This could happen if the queue is deleted or purged
%% and the mixed queue fails to shut us down.
- {noreply, State};
+ {noreply, State, hibernate};
handle_cast({publish, { Msg = #basic_message {},
_Size, IsDelivered, AckTag, _Remaining }},
State = #pstate { fetched_count = Fetched, target_count = Target,
@@ -245,10 +247,8 @@ handle_cast({publish, { Msg = #basic_message {},
MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf),
{noreply, State #pstate { fetched_count = Fetched + 1,
buf_length = Length + 1,
- msg_buf = MsgBuf1 }}.
+ msg_buf = MsgBuf1 }, hibernate}.
-handle_info(timeout, State) ->
- {noreply, State, hibernate};
handle_info({'DOWN', MRef, process, _Pid, _Reason},
State = #pstate { queue_mref = MRef }) ->
%% this is the amqqueue_process going down, so we should go down