summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-07-29 14:55:16 +0100
committerMatthew Sackman <matthew@lshift.net>2009-07-29 14:55:16 +0100
commita297f995c54f48f2be51265ffb257377a1b8a81e (patch)
tree03b1283c27a6f9245881173449296cac17e0d131
parent734b8e60fdf05fc541c61fed025cd35a3c07311f (diff)
parent2a571d8b582fade0bdc34084cc08cd11c920b881 (diff)
downloadrabbitmq-server-git-a297f995c54f48f2be51265ffb257377a1b8a81e.tar.gz
merging in from 21087. All tests pass.
-rw-r--r--src/gen_server2.erl207
-rw-r--r--src/rabbit_amqqueue_process.erl26
-rw-r--r--src/rabbit_disk_queue.erl26
3 files changed, 163 insertions, 96 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 6d8d2ff6e1..be2c5730d6 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -21,22 +21,34 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
-%% 5) On return from init/1, the timeout value {binary, Min} creates a
-%% binary exponential timeout, where Min is the minimum number of
-%% milliseconds permitted, and is also used as the current timeout
-%% value. Returning from handle_* with the timeout value set to
-%% 'binary' will use the current binary timeout value. handle_info/2
-%% with the Info of 'timeout' will function normally, and supports the
-%% return value of {noreply, State, hibernate} which will hibernate
-%% the process. The current timeout value is:
+%% 5) init can return a 4th arg, {backoff, InitialTimeout,
+%% MinimumTimeout, DesiredHibernatePeriod} (all in
+%% milliseconds). Then, on all callbacks which can return a timeout
+%% (including init), timeout can be 'hibernate'. When this is the
+%% case, the current timeout value will be used (initially, the
+%% InitialTimeout supplied from init). After this timeout has
+%% occurred, handle_pre_hibernate/1 will be called. If that returns
+%% {hibernate, State} then the process will be hibernated. Upon
+%% awaking, a new current timeout value will be calculated, and then
+%% handle_post_hibernate/1 will be called. The purpose is that the
+%% gen_server2 takes care of adjusting the current timeout value such
+%% that the process will increase the timeout value repeatedly if it
+%% is unable to sleep for the DesiredHibernatePeriod. If it is able to
+%% sleep for the DesiredHibernatePeriod it will decrease the current
+%% timeout down to the MinimumTimeout, so that the process is put to
+%% sleep sooner (and hopefully for longer). In short, should a process
+%% using this receive a burst of messages, it should not hibernate
+%% between those messages, but as the messages become less frequent,
+%% the process will not only hibernate, it will do so sooner after
+%% each message.
%%
-%% a) doubled if the time spent in hibernation is < 4 * the current value;
-%% b) halved if the time spent in hibernation is > 16 * the current value;
-%% c) maintained in all other cases
-%%
-%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
-%% are still supported, and do not have any effect on the current
-%% timeout value.
+%% Normal timeout values (i.e. not 'hibernate') can still be used, and
+%% if they are used then the handle_info(timeout, State) will be
+%% called as normal. In this case, returning 'hibernate' from
+%% handle_info(timeout, State) will not hibernate the process
+%% immediately, as it would if backoff wasn't being used. Instead
+%% it'll wait for the current timeout as described above, before
+%% calling handle_pre_hibernate(State).
%% All modifications are (C) 2009 LShift Ltd.
@@ -72,6 +84,7 @@
%%% init(Args)
%%% ==> {ok, State}
%%% {ok, State, Timeout}
+%%% {ok, State, Timeout, Backoff}
%%% ignore
%%% {stop, Reason}
%%%
@@ -103,6 +116,17 @@
%%%
%%% ==> ok
%%%
+%%% handle_pre_hibernate(State)
+%%%
+%%% ==> {hibernate, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
+%%%
+%%% handle_post_hibernate(State)
+%%%
+%%% ==> {noreply, State}
+%%% {stop, Reason, State}
+%%% Reason = normal | shutdown | Term, terminate(State) is called
%%%
%%% The work flow (of the server) can be described as follows:
%%%
@@ -133,7 +157,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7, wake_hib/8]).
-export([behaviour_info/1]).
@@ -152,10 +176,6 @@
%%% API
%%%=========================================================================
--ifdef(use_specs).
--spec behaviour_info(atom()) -> 'undefined' | [{atom(), any()}].
--endif.
-
behaviour_info(callbacks) ->
[{init,1},{handle_call,3},{handle_cast,2},{handle_info,2},
{terminate,2},{code_change,3}];
@@ -311,7 +331,7 @@ multi_call(Nodes, Name, Req, Timeout)
%%-----------------------------------------------------------------
-%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>) ->_
+%% enter_loop(Mod, Options, State, <ServerName>, <TimeOut>, <Backoff>) ->_
%%
%% Description: Makes an existing process into a gen_server.
%% The calling process will enter the gen_server receive
@@ -322,21 +342,29 @@ multi_call(Nodes, Name, Req, Timeout)
%% process, including registering a name for it.
%%-----------------------------------------------------------------
enter_loop(Mod, Options, State) ->
- enter_loop(Mod, Options, State, self(), infinity).
+ enter_loop(Mod, Options, State, self(), infinity, undefined).
+
+enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) ->
+ enter_loop(Mod, Options, State, self(), infinity, Backoff);
enter_loop(Mod, Options, State, ServerName = {_, _}) ->
- enter_loop(Mod, Options, State, ServerName, infinity);
+ enter_loop(Mod, Options, State, ServerName, infinity, undefined);
enter_loop(Mod, Options, State, Timeout) ->
- enter_loop(Mod, Options, State, self(), Timeout).
+ enter_loop(Mod, Options, State, self(), Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) ->
+ enter_loop(Mod, Options, State, ServerName, infinity, Backoff);
enter_loop(Mod, Options, State, ServerName, Timeout) ->
+ enter_loop(Mod, Options, State, ServerName, Timeout, undefined).
+
+enter_loop(Mod, Options, State, ServerName, Timeout, Backoff) ->
Name = get_proc_name(ServerName),
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- {Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue, Debug).
+ loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -361,9 +389,10 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
proc_lib:init_ack(Starter, {ok, self()}),
- {Timeout1, TimeoutState} = build_timeout_state(Timeout),
- loop(Parent, Name, State, Mod, Timeout1, TimeoutState, Queue,
- Debug);
+ loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug);
+ {ok, State, Timeout, Backoff = {backoff, _, _, _}} ->
+ proc_lib:init_ack(Starter, {ok, self()}),
+ loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -401,12 +430,6 @@ unregister_name({global,Name}) ->
unregister_name(Pid) when is_pid(Pid) ->
Pid.
-build_timeout_state(Timeout) ->
- case Timeout of
- {binary, Min} -> {binary, {Min, Min, undefined}};
- _ -> {Timeout, undefined}
- end.
-
%%%========================================================================
%%% Internal functions
%%%========================================================================
@@ -416,10 +439,6 @@ build_timeout_state(Timeout) ->
loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
proc_lib:hibernate(?MODULE,wake_hib,
[Parent, Name, State, Mod, undefined, Queue, Debug]);
-loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue,
- Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod,
- {Current, Min, now()}, Queue, Debug]);
loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
@@ -436,17 +455,27 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
process_msg(Parent, Name, State, Mod,
Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
- Time1 = case {Time, TimeoutState} of
- {binary, {Current, _Min, undefined}} -> Current;
- _ -> Time
- end,
+ {Time1, HibOnTimeout}
+ = case {Time, TimeoutState} of
+ {hibernate, {backoff, Current, _Min, _Desired}} ->
+ {Current, true};
+ _ -> {Time, false}
+ end,
receive
Input ->
loop(Parent, Name, State, Mod,
Time, TimeoutState, in(Input, Queue1), Debug)
after Time1 ->
- process_msg(Parent, Name, State, Mod,
- Time, TimeoutState, Queue1, Debug, Hib, timeout)
+ case HibOnTimeout of
+ true ->
+ backoff_pre_hibernate(
+ Parent, Name, State, Mod, TimeoutState, Queue1,
+ Debug);
+ false ->
+ process_msg(
+ Parent, Name, State, Mod, Time, TimeoutState,
+ Queue1, Debug, Hib, timeout)
+ end
end
end.
@@ -455,29 +484,62 @@ wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Input ->
Input
end,
- TimeoutState1 = adjust_hibernate_after(TimeoutState),
- process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1,
+ process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState,
in(Msg, Queue), Debug, true).
-adjust_hibernate_after(undefined) ->
- undefined;
-adjust_hibernate_after({Current, Min, HibernatedAt}) ->
- NapLengthMicros = timer:now_diff(now(), HibernatedAt),
- CurrentMicros = Current * 1000,
- LowTargetMicros = CurrentMicros * 4,
- HighTargetMicros = LowTargetMicros * 4,
- if
- NapLengthMicros < LowTargetMicros ->
- %% nap was too short, don't go to sleep as soon
- {Current * 2, Min, undefined};
-
- NapLengthMicros > HighTargetMicros ->
- %% nap was long, try going to sleep sooner
- {lists:max([Min, round(Current / 2)]), Min, undefined};
-
- true ->
- %% nap and timeout seem to be in the right relationship. stay here
- {Current, Min, undefined}
+wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) ->
+ AwokeAt = now(),
+ Msg = receive
+ Input ->
+ Input
+ end,
+ backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
+ TimeoutState, in(Msg, Queue), Debug).
+
+backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
+ case catch Mod:handle_pre_hibernate(State) of
+ {hibernate, NState} ->
+ proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod,
+ now(), TimeoutState,
+ Queue, Debug]);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, pre_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, pre_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod,
+ State, [])
+ end.
+
+backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt,
+ {backoff, CurrentTO, MinimumTO, DesiredHibPeriod},
+ Queue, Debug) ->
+ NapLengthMicros = timer:now_diff(AwokeAt, SleptAt),
+ CurrentMicros = CurrentTO * 1000,
+ MinimumMicros = MinimumTO * 1000,
+ DesiredHibMicros = DesiredHibPeriod * 1000,
+ CurrentTO1 = case (NapLengthMicros + CurrentMicros) >
+ (MinimumMicros + DesiredHibMicros) of
+ true ->
+ lists:max([MinimumTO, round(CurrentTO/2)]);
+ false ->
+ CurrentTO + MinimumTO
+ end,
+ TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod},
+ case catch Mod:handle_post_hibernate(State) of
+ {noreply, NState} ->
+ process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState,
+ Queue, Debug, true);
+ {noreply, NState, Time} ->
+ process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState,
+ Queue, Debug, true);
+ {stop, Reason, NState} ->
+ terminate(Reason, Name, post_hibernate, Mod, NState, []);
+ {'EXIT', What} ->
+ terminate(What, Name, post_hibernate, Mod, State, []);
+ Reply ->
+ terminate({bad_return_value, Reply}, Name, post_hibernate, Mod,
+ State, [])
end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
@@ -992,9 +1054,8 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time,
- TimeoutState, Queue]] =
- StatusData,
+ [PDict, SysState, Parent, Debug,
+ [Name, State, Mod, _Time, _TimeoutState, Queue]] = StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
is_atom(Name) ->
@@ -1012,15 +1073,9 @@ format_status(Opt, StatusData) ->
_ ->
[{data, [{"State", State}]}]
end,
- Specfic1 = case TimeoutState of
- undefined -> Specfic;
- {Current, Min, undefined} ->
- [ {"Binary Timeout Current and Min", {Current, Min}}
- | Specfic]
- end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
- Specfic1].
+ Specfic].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index ab96feff1a..a0e6b1c69d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -37,12 +37,14 @@
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-define(MEMORY_REPORT_TIME_INTERVAL, 10000). %% 10 seconds in milliseconds
-export([start_link/1]).
--export([init/1, terminate/2, code_change/3,
- handle_call/3, handle_cast/2, handle_info/2]).
+-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
+ handle_info/2]).
+-export([handle_pre_hibernate/1, handle_post_hibernate/1]).
-import(queue).
-import(erlang).
@@ -115,7 +117,8 @@ init(Q = #amqqueue { name = QName, durable = Durable }) ->
},
%% first thing we must do is report_memory which will clear out
%% the 'undefined' values in gain and loss in mixed_queue state
- {ok, report_memory(false, State), {binary, ?HIBERNATE_AFTER_MIN}}.
+ {ok, report_memory(false, State), hibernate,
+ {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -134,10 +137,10 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
reply(Reply, NewState) ->
- {reply, Reply, start_memory_timer(NewState), binary}.
+ {reply, Reply, start_memory_timer(NewState), hibernate}.
noreply(NewState) ->
- {noreply, start_memory_timer(NewState), binary}.
+ {noreply, start_memory_timer(NewState), hibernate}.
start_memory_timer() ->
{ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL,
@@ -857,13 +860,16 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
-handle_info(timeout, State = #q { mixed_state = MS }) ->
+handle_info(Info, State) ->
+ ?LOGDEBUG("Info in queue: ~p~n", [Info]),
+ {stop, {unhandled_info, Info}, State}.
+
+handle_pre_hibernate(State = #q { mixed_state = MS }) ->
MS1 = rabbit_mixed_queue:maybe_prefetch(MS),
State1 =
stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })),
%% don't call noreply/1 as that'll restart the memory_report_timer
- {noreply, State1, hibernate};
+ {hibernate, State1}.
-handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
- {stop, {unhandled_info, Info}, State}.
+handle_post_hibernate(State) ->
+ {noreply, State, hibernate}.
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 05ba3a6c62..3a7d2f29b7 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -37,6 +37,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
+-export([handle_pre_hibernate/1, handle_post_hibernate/1]).
-export([publish/3, deliver/1, phantom_deliver/1, ack/2,
tx_publish/1, tx_commit/3, tx_cancel/1,
@@ -77,6 +78,7 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(HIBERNATE_AFTER_MIN, 1000).
+-define(DESIRED_HIBERNATE, 10000).
-record(dqstate,
{msg_location_dets, %% where are messages?
@@ -461,7 +463,8 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% ets_bytes_per_record otherwise.
ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
ok = report_memory(false, State2),
- {ok, State2, {binary, ?HIBERNATE_AFTER_MIN}}.
+ {ok, State2, hibernate, {backoff, ?HIBERNATE_AFTER_MIN,
+ ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
handle_call({deliver, Q}, _From, State) ->
{ok, Result, State1} = internal_deliver(Q, true, false, true, State),
@@ -557,15 +560,18 @@ handle_cast({set_delivered_and_advance, Q, MsgSeqId}, State) ->
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
-handle_info(timeout, State = #dqstate { commit_timer_ref = undefined }) ->
- %% this is the binary timeout coming back
- %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
- ok = report_memory(true, State),
- {noreply, stop_memory_timer(State), hibernate};
handle_info(timeout, State) ->
%% must have commit_timer set, so timeout was 0, and we're not hibernating
noreply(sync_current_file_handle(State)).
+handle_pre_hibernate(State) ->
+ %% don't use noreply/1 or noreply1/1 as they'll restart the memory timer
+ ok = report_memory(true, State),
+ {hibernate, stop_memory_timer(State)}.
+
+handle_post_hibernate(State) ->
+ noreply(State).
+
terminate(_Reason, State) ->
shutdown(State).
@@ -690,11 +696,11 @@ noreply(NewState) ->
noreply1(NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }) ->
- {noreply, NewState, binary};
+ {noreply, NewState, hibernate};
noreply1(NewState = #dqstate { commit_timer_ref = undefined }) ->
{noreply, start_commit_timer(NewState), 0};
noreply1(NewState = #dqstate { on_sync_txns = [] }) ->
- {noreply, stop_commit_timer(NewState), binary};
+ {noreply, stop_commit_timer(NewState), hibernate};
noreply1(NewState) ->
{noreply, NewState, 0}.
@@ -703,11 +709,11 @@ reply(Reply, NewState) ->
reply1(Reply, NewState = #dqstate { on_sync_txns = [],
commit_timer_ref = undefined }) ->
- {reply, Reply, NewState, binary};
+ {reply, Reply, NewState, hibernate};
reply1(Reply, NewState = #dqstate { commit_timer_ref = undefined }) ->
{reply, Reply, start_commit_timer(NewState), 0};
reply1(Reply, NewState = #dqstate { on_sync_txns = [] }) ->
- {reply, Reply, stop_commit_timer(NewState), binary};
+ {reply, Reply, stop_commit_timer(NewState), hibernate};
reply1(Reply, NewState) ->
{reply, Reply, NewState, 0}.