summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl162
-rw-r--r--src/rabbit_amqqueue_process.erl47
2 files changed, 117 insertions, 92 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index 7ce81f9273..dc1e8691d7 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -1,4 +1,4 @@
-%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP
+%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP
%% distribution, with the following modifications:
%%
%% 1) the module name is gen_server2
@@ -21,6 +21,23 @@
%% higher priorities are processed before requests with lower
%% priorities. The default priority is 0.
%%
+%% 5) On return from init/1, the timeout value {binary, Min} creates a
+%% binary exponential timeout, where Min is the minimum number of
+%% milliseconds permitted, and is also used as the current timeout
+%% value. Returning from handle_* with the timeout value set to
+%% 'binary' will use the current binary timeout value. handle_info/2
+%% with the Info of 'timeout' will function normally, and supports the
+%% return value of {noreply, State, hibernate} which will hibernate
+%% the process. The current timeout value is:
+%%
+%% a) doubled if the time spent in hibernation is < 4 * the current value;
+%% b) halved if the time spent in hibernation is > 16 * the current value;
+%% c) maintained in all other cases
+%%
+%% Explicit timeouts (i.e. not 'binary') from the handle_* functions
+%% are still supported, and do not have any effect on the current
+%% timeout value.
+
%% All modifications are (C) 2009 LShift Ltd.
%% ``The contents of this file are subject to the Erlang Public License,
@@ -116,7 +133,7 @@
cast/2, pcast/3, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
- enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]).
+ enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]).
-export([behaviour_info/1]).
@@ -316,7 +333,12 @@ enter_loop(Mod, Options, State, ServerName, Timeout) ->
Parent = get_parent(),
Debug = debug_options(Name, Options),
Queue = priority_queue:new(),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug).
+ TimeoutState = case Timeout of
+ {binary, Min} ->
+ {Min, Min, undefined};
+ _ -> undefined
+ end,
+ loop(Parent, Name, State, Mod, Timeout, TimeoutState, Queue, Debug).
%%%========================================================================
%%% Gen-callback functions
@@ -338,10 +360,15 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) ->
case catch Mod:init(Args) of
{ok, State} ->
proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, infinity, Queue, Debug);
+ loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug);
{ok, State, Timeout} ->
- proc_lib:init_ack(Starter, {ok, self()}),
- loop(Parent, Name, State, Mod, Timeout, Queue, Debug);
+ proc_lib:init_ack(Starter, {ok, self()}),
+ TimeoutState = case Timeout of
+ {binary, Min} ->
+ {Min, Min, undefined};
+ _ -> undefined
+ end,
+ loop(Parent, Name, State, Mod, binary, TimeoutState, Queue, Debug);
{stop, Reason} ->
%% For consistency, we must make sure that the
%% registered name (if any) is unregistered before
@@ -383,38 +410,66 @@ unregister_name(Pid) when is_pid(Pid) ->
%%% ---------------------------------------------------
%%% The MAIN loop.
%%% ---------------------------------------------------
-loop(Parent, Name, State, Mod, hibernate, Queue, Debug) ->
- proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Queue, Debug]);
-loop(Parent, Name, State, Mod, Time, Queue, Debug) ->
+loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, undefined, Queue, Debug]);
+loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, Debug) ->
+ proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, {Current, Min, now()}, Queue, Debug]);
+loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) ->
receive
Input -> loop(Parent, Name, State, Mod,
- Time, in(Input, Queue), Debug)
+ Time, TimeoutState, in(Input, Queue), Debug)
after 0 ->
- process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, false)
+ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, false)
end.
-process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, Hib) ->
+process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Hib) ->
case priority_queue:out(Queue) of
{{value, Msg}, Queue1} ->
process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Hib, Msg);
+ Time, TimeoutState, Queue1, Debug, Hib, Msg);
{empty, Queue1} ->
+ Time1 = case {Time, TimeoutState} of
+ {binary, {Current, _Min, undefined}} -> Current;
+ _ -> Time
+ end,
receive
Input ->
loop(Parent, Name, State, Mod,
- Time, in(Input, Queue1), Debug)
- after Time ->
+ Time, TimeoutState, in(Input, Queue1), Debug)
+ after Time1 ->
process_msg(Parent, Name, State, Mod,
- Time, Queue1, Debug, Hib, timeout)
+ Time, TimeoutState, Queue1, Debug, Hib, timeout)
end
end.
-wake_hib(Parent, Name, State, Mod, Queue, Debug) ->
+wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Msg = receive
Input ->
Input
end,
- process_next_msg(Parent, Name, State, Mod, hibernate, in(Msg, Queue), Debug, true).
+ TimeoutState1 = adjust_hibernate_after(TimeoutState),
+ process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, in(Msg, Queue), Debug, true).
+
+adjust_hibernate_after(undefined) ->
+ undefined;
+adjust_hibernate_after({Current, Min, HibernatedAt}) ->
+ NapLengthMicros = timer:now_diff(now(), HibernatedAt),
+ CurrentMicros = Current * 1000,
+ LowTargetMicros = CurrentMicros * 4,
+ HighTargetMicros = LowTargetMicros * 4,
+ if
+ NapLengthMicros < LowTargetMicros ->
+ %% nap was too short, don't go to sleep as soon
+ {Current * 2, Min, undefined};
+
+ NapLengthMicros > HighTargetMicros ->
+ %% nap was long, try going to sleep sooner
+ {lists:max([Min, round(Current / 2)]), Min, undefined};
+
+ true ->
+ %% nap and timeout seem to be in the right relationship. stay here
+ {Current, Min, undefined}
+ end.
in({'$gen_pcast', {Priority, Msg}}, Queue) ->
priority_queue:in({'$gen_cast', Msg}, Priority, Queue);
@@ -423,22 +478,22 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) ->
in(Input, Queue) ->
priority_queue:in(Input, Queue).
-process_msg(Parent, Name, State, Mod, Time, Queue, Debug, _Hib, Msg) ->
+process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, _Hib, Msg) ->
case Msg of
{system, From, Req} ->
sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, Queue]);
+ [Name, State, Mod, Time, TimeoutState, Queue]);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
{'EXIT', Parent, Reason} ->
terminate(Reason, Name, Msg, Mod, State, Debug);
_Msg when Debug =:= [] ->
- handle_msg(Msg, Parent, Name, State, Mod, Queue);
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue);
_Msg ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event},
Name, {in, Msg}),
- handle_msg(Msg, Parent, Name, State, Mod, Queue, Debug1)
+ handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, Debug1)
end.
%%% ---------------------------------------------------
@@ -641,48 +696,48 @@ dispatch(Info, Mod, State) ->
Mod:handle_info(Info, State).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{reply, Reply, NState, Time1} ->
reply(From, Reply),
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, [])),
reply(From, Reply),
exit(R);
Other -> handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue)
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, Queue) ->
+ Parent, Name, State, Mod, TimeoutState, Queue) ->
Reply = (catch dispatch(Msg, Mod, State)),
- handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue).
+ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue).
handle_msg({'$gen_call', From, Msg},
- Parent, Name, State, Mod, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
case catch Mod:handle_call(Msg, From, State) of
{reply, Reply, NState} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{reply, Reply, NState, Time1} ->
Debug1 = reply(Name, From, Reply, NState, Debug),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, Reply, NState} ->
{'EXIT', R} =
(catch terminate(Reason, Name, Msg, Mod, NState, Debug)),
@@ -690,20 +745,20 @@ handle_msg({'$gen_call', From, Msg},
exit(R);
Other ->
handle_common_reply(Other,
- Parent, Name, Msg, Mod, State, Queue, Debug)
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug)
end;
handle_msg(Msg,
- Parent, Name, State, Mod, Queue, Debug) ->
+ Parent, Name, State, Mod, TimeoutState, Queue, Debug) ->
Reply = (catch dispatch(Msg, Mod, State)),
handle_common_reply(Reply,
- Parent, Name, Msg, Mod, State, Queue, Debug).
+ Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug).
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) ->
case Reply of
{noreply, NState} ->
- loop(Parent, Name, NState, Mod, infinity, Queue, []);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []);
{noreply, NState, Time1} ->
- loop(Parent, Name, NState, Mod, Time1, Queue, []);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, []);
{'EXIT', What} ->
@@ -712,16 +767,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) ->
terminate({bad_return_value, Reply}, Name, Msg, Mod, State, [])
end.
-handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) ->
+handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) ->
case Reply of
{noreply, NState} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, infinity, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1);
{noreply, NState, Time1} ->
Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name,
{noreply, NState}),
- loop(Parent, Name, NState, Mod, Time1, Queue, Debug1);
+ loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1);
{stop, Reason, NState} ->
terminate(Reason, Name, Msg, Mod, NState, Debug);
{'EXIT', What} ->
@@ -739,17 +794,17 @@ reply(Name, {To, Tag}, Reply, State, Debug) ->
%%-----------------------------------------------------------------
%% Callback functions for system messages handling.
%%-----------------------------------------------------------------
-system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) ->
- loop(Parent, Name, State, Mod, Time, Queue, Debug).
+system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) ->
+ loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug).
-spec system_terminate(_, _, _, [_]) -> no_return().
-system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) ->
+system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, _Queue]) ->
terminate(Reason, Name, [], Mod, State, Debug).
-system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) ->
+system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, OldVsn, Extra) ->
case catch Mod:code_change(OldVsn, State, Extra) of
- {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]};
+ {ok, NewState} -> {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]};
Else -> Else
end.
@@ -918,7 +973,7 @@ name_to_pid(Name) ->
%% Status information
%%-----------------------------------------------------------------
format_status(Opt, StatusData) ->
- [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] =
+ [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, TimeoutState, Queue]] =
StatusData,
NameTag = if is_pid(Name) ->
pid_to_list(Name);
@@ -937,9 +992,14 @@ format_status(Opt, StatusData) ->
_ ->
[{data, [{"State", State}]}]
end,
+ Specfic1 = case TimeoutState of
+ undefined -> Specfic;
+ {Current, Min, undefined} ->
+ [{"Binary Timeout Current and Min", {Current, Min}} | Specfic]
+ end,
[{header, Header},
{data, [{"Status", SysState},
{"Parent", Parent},
{"Logged events", Log},
{"Queued messages", priority_queue:to_list(Queue)}]} |
- Specfic].
+ Specfic1].
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e2dc598ffd..0180d86ac3 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -54,9 +54,7 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers,
- hibernate_after,
- hibernated_at
+ blocked_consumers
}).
-record(consumer, {tag, ack_required}).
@@ -104,10 +102,8 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
- hibernate_after = ?HIBERNATE_AFTER_MIN,
- hibernated_at = undefined
- }, ?HIBERNATE_AFTER_MIN}.
+ blocked_consumers = queue:new()
+ }, {binary, ?HIBERNATE_AFTER_MIN}}.
terminate(_Reason, State) ->
%% FIXME: How do we cancel active subscriptions?
@@ -122,41 +118,11 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
-reply(Reply, NewState = #q { hibernated_at = undefined }) ->
- {reply, Reply, NewState, NewState #q.hibernate_after};
reply(Reply, NewState) ->
- NewState1 = adjust_hibernate_after(NewState),
- {reply, Reply, NewState1, NewState1 #q.hibernate_after}.
+ {reply, Reply, NewState, binary}.
-noreply(NewState = #q { hibernated_at = undefined }) ->
- {noreply, NewState, NewState #q.hibernate_after};
noreply(NewState) ->
- NewState1 = adjust_hibernate_after(NewState),
- {noreply, NewState1, NewState1 #q.hibernate_after}.
-
-adjust_hibernate_after(State = #q { hibernated_at = undefined }) ->
- State;
-adjust_hibernate_after(State = #q { hibernated_at = Then,
- hibernate_after = Timeout }) ->
- State1 = State #q { hibernated_at = undefined },
- NapLengthMicros = timer:now_diff(now(), Then),
- TimeoutMicros = Timeout * 1000,
- LowTargetMicros = TimeoutMicros * 4,
- HighTargetMicros = LowTargetMicros * 4,
- if
- NapLengthMicros < LowTargetMicros ->
- %% nap was too short, don't go to sleep as soon
- State1 #q { hibernate_after = Timeout * 2 };
-
- NapLengthMicros > HighTargetMicros ->
- %% nap was long, try going to sleep sooner
- Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]),
- State1 #q { hibernate_after = Timeout1 };
-
- true ->
- %% nap and timeout seem to be in the right relationship. stay here
- State1
- end.
+ {noreply, NewState, binary}.
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
@@ -852,8 +818,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_ch_down(DownPid, State);
handle_info(timeout, State) ->
- State1 = State #q { hibernated_at = now() },
- {noreply, State1, hibernate};
+ {noreply, State, hibernate};
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),