summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-03 14:04:55 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-03 14:04:55 +0100
commitcf714a38b3f36fb9be07ba1d909c5b8db6ab4c50 (patch)
tree8eadfe4bdaca6ae49ab5e3b8e004aebac69864ef /src
parent2a571d8b582fade0bdc34084cc08cd11c920b881 (diff)
downloadrabbitmq-server-git-cf714a38b3f36fb9be07ba1d909c5b8db6ab4c50.tar.gz
All done.
Introduced drain explicitly because to do otherwise would have made life even harder. Everything addressed as per bug and IM. Test once for functions being exported and cache
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl145
-rw-r--r--src/rabbit_amqqueue_process.erl7
2 files changed, 82 insertions, 70 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index be2c5730d6..63b1d908e4 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -345,7 +345,8 @@ enter_loop(Mod, Options, State) ->
enter_loop(Mod, Options, State, self(), infinity, undefined).
enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
- enter_loop(Mod, Options, State, self(), infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, self(), infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
enter_loop(Mod, Options, State, ServerName, infinity, undefined);
@@ -354,7 +355,8 @@ enter_loop(Mod, Options, State, Timeout) ->
enter_loop(Mod, Options, State, self(), Timeout, undefined).
enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff1);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
@@ -392,7 +394,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
{ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug);
+ Backoff1 = extend_backoff(Mod, Backoff),
+ loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -430,6 +433,11 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
+extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) ->
+ Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1),
+ Post = erlang:function_exported(Mod, handle_post_hibernate, 1),
+ {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}.
+
%%%========================================================================
%%% Internal functions
%%%========================================================================
@@ -440,24 +448,25 @@ loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, State, Mod, undefined, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
+ drain(Queue), Debug).
+
+drain(Queue) ->
receive
- Input -> loop(Parent, Name, State, Mod,
- Time, TimeoutState, in(Input, Queue), Debug)
- after 0 ->
- process_next_msg(Parent, Name, State, Mod, Time, TimeoutState,
- Queue, Debug, false)
+ Input -> drain(in(Input,Queue))
+ after 0 -> Queue
end.
-process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, Hib) ->
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, Msg);
+ Time, TimeoutState, Queue1, Debug, Msg);
{empty, Queue1} ->
{Time1, HibOnTimeout}
= case {Time, TimeoutState} of
- {hibernate, {backoff, Current, _Min, _Desired}} ->
+ {hibernate,
+ {backoff, Current, _Min, _Desired, _Pre, _Post}} ->
{Current, true};
_ -> {Time, false}
end,
@@ -474,72 +483,82 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
false ->
process_msg(
Parent, Name, State, Mod, Time, TimeoutState,
- Queue1, Debug, Hib, timeout)
+ Queue1, Debug, timeout)
end
end
end.
wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- Msg = receive
- Input ->
- Input
- end,
process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
- in(Msg, Queue), Debug, true).
+ drain(Queue), Debug).
wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
- AwokeAt = now(),
- Msg = receive
- Input ->
- Input
- end,
- backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- TimeoutState, in(Msg, Queue), Debug).
-
-backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
- case catch Mod:handle_pre_hibernate(State) of
- {hibernate, NState} ->
- proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod,
- now(), TimeoutState,
- Queue, Debug]);
- {stop, Reason, NState} ->
- terminate(Reason, Name, pre_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, pre_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
- State, [])
+ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(),
+ TimeoutState, drain(Queue), Debug).
+
+backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState =
+ {backoff, _Current, _Minimum, _Desired, Pre, _Post},
+ Queue, Debug) ->
+ case Pre of
+ true ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState,
+ Mod, now(),
+ TimeoutState, Queue,
+ Debug]);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, pre_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, pre_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
+ State, [])
+ end;
+ false ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod,
+ now(), TimeoutState, Queue,
+ Debug])
end.
backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
- {backoff, CurrentTO, MinimumTO, DesiredHibPeriod},
+ {backoff, CurrentTO, MinimumTO, DesiredHibPeriod,
+ Pre, Post},
Queue, Debug) ->
NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
CurrentMicros = CurrentTO * 1000,
MinimumMicros = MinimumTO * 1000,
DesiredHibMicros = DesiredHibPeriod * 1000,
- CurrentTO1 = case (NapLengthMicros + CurrentMicros) >
- (MinimumMicros + DesiredHibMicros) of
- true ->
- lists:max([MinimumTO, round(CurrentTO/2)]);
- false ->
- CurrentTO + MinimumTO
- end,
- TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod},
- case catch Mod:handle_post_hibernate(State) of
- {noreply, NState} ->
- process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState,
- Queue, Debug, true);
- {noreply, NState, Time} ->
- process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState,
- Queue, Debug, true);
- {stop, Reason, NState} ->
- terminate(Reason, Name, post_hibernate, Mod, NState, []);
- {'EXIT', What} ->
- terminate(What, Name, post_hibernate, Mod, State, []);
- Reply ->
- terminate({bad_return_value, Reply}, Name, post_hibernate, Mod,
- State, [])
+ GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros,
+ Base =
+ %% If enough time has passed between the last two messages then we
+ %% should consider sleeping sooner. Otherwise stay awake longer.
+ case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of
+ true -> lists:max([MinimumTO, CurrentTO div 2]);
+ false -> CurrentTO
+ end,
+ CurrentTO1 = Base + random:uniform(Base),
+ TimeoutState =
+ {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post},
+ case Post of
+ true ->
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState,
+ Queue, Debug);
+ {noreply, NState, Time} ->
+ loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue,
+ Debug);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, post_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, post_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, post_hibernate,
+ Mod, State, [])
+ end;
+ false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue,
+ Debug)
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
@@ -550,7 +569,7 @@ in(Input, Queue) ->
priority_queue:in(Input, Queue).
process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
- Debug, _Hib, Msg) ->
+ Debug, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6a30503e0d..fe2e8509f7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,6 @@
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--export([handle_pre_hibernate/1, handle_post_hibernate/1]).
-import(queue).
-import(erlang).
@@ -819,9 +818,3 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
-
-handle_pre_hibernate(State) ->
- {hibernate, State}.
-
-handle_post_hibernate(State) ->
- {noreply, State, hibernate}.