diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 162 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 47 |
2 files changed, 117 insertions, 92 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index 7ce81f9273..dc1e8691d7 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -21,6 +21,23 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% +%% 5) On return from init/1, the timeout value {binary, Min} creates a +%% binary exponential timeout, where Min is the minimum number of +%% milliseconds permitted, and is also used as the current timeout +%% value. Returning from handle_* with the timeout value set to +%% 'binary' will use the current binary timeout value. handle_info/2 +%% with the Info of 'timeout' will function normally, and supports the +%% return value of {noreply, State, hibernate} which will hibernate +%% the process. The current timeout value is: +%% +%% a) doubled if the time spent in hibernation is < 4 * the current value; +%% b) halved if the time spent in hibernation is > 16 * the current value; +%% c) maintained in all other cases +%% +%% Explicit timeouts (i.e. not 'binary') from the handle_* functions +%% are still supported, and do not have any effect on the current +%% timeout value. + %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -116,7 +133,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/6]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -316,7 +333,12 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + TimeoutState = case Timeout of + {binary, Min} -> + {Min, Min, undefined}; + _ -> undefined + end, + loop(Parent, Name, State, Mod, Timeout, TimeoutState, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -338,10 +360,15 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + TimeoutState = case Timeout of + {binary, Min} -> + {Min, Min, undefined}; + _ -> undefined + end, + loop(Parent, Name, State, Mod, binary, TimeoutState, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -383,38 +410,66 @@ unregister_name(Pid) when is_pid(Pid) -> %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, hibernate, Queue, Debug) -> - proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, Queue, Debug]); -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, undefined, Queue, Debug]); +loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, {Current, Min, now()}, Queue, Debug]); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue), Debug) + Time, TimeoutState, in(Input, Queue), Debug) after 0 -> - process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, false) + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, false) end. -process_next_msg(Parent, Name, State, Mod, Time, Queue, Debug, Hib) -> +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Hib) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Hib, Msg); + Time, TimeoutState, Queue1, Debug, Hib, Msg); {empty, Queue1} -> + Time1 = case {Time, TimeoutState} of + {binary, {Current, _Min, undefined}} -> Current; + _ -> Time + end, receive Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> + Time, TimeoutState, in(Input, Queue1), Debug) + after Time1 -> process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Hib, timeout) + Time, TimeoutState, Queue1, Debug, Hib, timeout) end end. -wake_hib(Parent, Name, State, Mod, Queue, Debug) -> +wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Msg = receive Input -> Input end, - process_next_msg(Parent, Name, State, Mod, hibernate, in(Msg, Queue), Debug, true). + TimeoutState1 = adjust_hibernate_after(TimeoutState), + process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, in(Msg, Queue), Debug, true). + +adjust_hibernate_after(undefined) -> + undefined; +adjust_hibernate_after({Current, Min, HibernatedAt}) -> + NapLengthMicros = timer:now_diff(now(), HibernatedAt), + CurrentMicros = Current * 1000, + LowTargetMicros = CurrentMicros * 4, + HighTargetMicros = LowTargetMicros * 4, + if + NapLengthMicros < LowTargetMicros -> + %% nap was too short, don't go to sleep as soon + {Current * 2, Min, undefined}; + + NapLengthMicros > HighTargetMicros -> + %% nap was long, try going to sleep sooner + {lists:max([Min, round(Current / 2)]), Min, undefined}; + + true -> + %% nap and timeout seem to be in the right relationship. stay here + {Current, Min, undefined} + end. in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); @@ -423,22 +478,22 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, _Hib, Msg) -> +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, _Hib, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + [Name, State, Mod, Time, TimeoutState, Queue]); %% gen_server puts Hib on the end as the 7th arg, but that %% version of the function seems not to be documented so %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, Debug1) end. %%% --------------------------------------------------- @@ -641,48 +696,48 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Parent, Name, Msg, Mod, State, TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), @@ -690,20 +745,20 @@ handle_msg({'$gen_call', From, Msg}, exit(R); Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, []); {'EXIT', What} -> @@ -712,16 +767,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -739,17 +794,17 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). -spec system_terminate(_, _, _, [_]) -> no_return(). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; Else -> Else end. @@ -918,7 +973,7 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); @@ -937,9 +992,14 @@ format_status(Opt, StatusData) -> _ -> [{data, [{"State", State}]}] end, + Specfic1 = case TimeoutState of + undefined -> Specfic; + {Current, Min, undefined} -> + [{"Binary Timeout Current and Min", {Current, Min}} | Specfic] + end, [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | - Specfic]. + Specfic1]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e2dc598ffd..0180d86ac3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -54,9 +54,7 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers, - hibernate_after, - hibernated_at + blocked_consumers }). -record(consumer, {tag, ack_required}). @@ -104,10 +102,8 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new(), - hibernate_after = ?HIBERNATE_AFTER_MIN, - hibernated_at = undefined - }, ?HIBERNATE_AFTER_MIN}. + blocked_consumers = queue:new() + }, {binary, ?HIBERNATE_AFTER_MIN}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -122,41 +118,11 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- -reply(Reply, NewState = #q { hibernated_at = undefined }) -> - {reply, Reply, NewState, NewState #q.hibernate_after}; reply(Reply, NewState) -> - NewState1 = adjust_hibernate_after(NewState), - {reply, Reply, NewState1, NewState1 #q.hibernate_after}. + {reply, Reply, NewState, binary}. -noreply(NewState = #q { hibernated_at = undefined }) -> - {noreply, NewState, NewState #q.hibernate_after}; noreply(NewState) -> - NewState1 = adjust_hibernate_after(NewState), - {noreply, NewState1, NewState1 #q.hibernate_after}. - -adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> - State; -adjust_hibernate_after(State = #q { hibernated_at = Then, - hibernate_after = Timeout }) -> - State1 = State #q { hibernated_at = undefined }, - NapLengthMicros = timer:now_diff(now(), Then), - TimeoutMicros = Timeout * 1000, - LowTargetMicros = TimeoutMicros * 4, - HighTargetMicros = LowTargetMicros * 4, - if - NapLengthMicros < LowTargetMicros -> - %% nap was too short, don't go to sleep as soon - State1 #q { hibernate_after = Timeout * 2 }; - - NapLengthMicros > HighTargetMicros -> - %% nap was long, try going to sleep sooner - Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]), - State1 #q { hibernate_after = Timeout1 }; - - true -> - %% nap and timeout seem to be in the right relationship. stay here - State1 - end. + {noreply, NewState, binary}. lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -852,8 +818,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); handle_info(timeout, State) -> - State1 = State #q { hibernated_at = now() }, - {noreply, State1, hibernate}; + {noreply, State, hibernate}; handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), |
