diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-07-06 16:52:32 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-07-06 16:52:32 +0100 |
| commit | d5887a30de3921f4447b7069a64780450150b040 (patch) | |
| tree | 93cb2cad7df02ea99d8773d4f7caae2f2f5f9f5f /src | |
| parent | 8889e29c54f5adde8310eba77c23dc837cf46230 (diff) | |
| parent | f74caf2fce579ac9fe9842d66fb09affa7c199a3 (diff) | |
| download | rabbitmq-server-git-d5887a30de3921f4447b7069a64780450150b040.tar.gz | |
merge from bug21097
Diffstat (limited to 'src')
| -rw-r--r-- | src/gen_server2.erl | 217 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 4 |
3 files changed, 173 insertions, 110 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index ba8becfca9..dc1e8691d7 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -1,4 +1,4 @@ -%% This file is a copy of gen_server.erl from the R11B-5 Erlang/OTP +%% This file is a copy of gen_server.erl from the R13B-1 Erlang/OTP %% distribution, with the following modifications: %% %% 1) the module name is gen_server2 @@ -21,6 +21,23 @@ %% higher priorities are processed before requests with lower %% priorities. The default priority is 0. %% +%% 5) On return from init/1, the timeout value {binary, Min} creates a +%% binary exponential timeout, where Min is the minimum number of +%% milliseconds permitted, and is also used as the current timeout +%% value. Returning from handle_* with the timeout value set to +%% 'binary' will use the current binary timeout value. handle_info/2 +%% with the Info of 'timeout' will function normally, and supports the +%% return value of {noreply, State, hibernate} which will hibernate +%% the process. The current timeout value is: +%% +%% a) doubled if the time spent in hibernation is < 4 * the current value; +%% b) halved if the time spent in hibernation is > 16 * the current value; +%% c) maintained in all other cases +%% +%% Explicit timeouts (i.e. not 'binary') from the handle_* functions +%% are still supported, and do not have any effect on the current +%% timeout value. + %% All modifications are (C) 2009 LShift Ltd. %% ``The contents of this file are subject to the Erlang Public License, @@ -116,7 +133,7 @@ cast/2, pcast/3, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, - enter_loop/3, enter_loop/4, enter_loop/5]). + enter_loop/3, enter_loop/4, enter_loop/5, wake_hib/7]). -export([behaviour_info/1]). @@ -135,6 +152,8 @@ %%% API %%%========================================================================= +-spec behaviour_info(atom()) -> 'undefined' | [{atom(), arity()}]. + behaviour_info(callbacks) -> [{init,1},{handle_call,3},{handle_cast,2},{handle_info,2}, {terminate,2},{code_change,3}]; @@ -314,7 +333,12 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> Parent = get_parent(), Debug = debug_options(Name, Options), Queue = priority_queue:new(), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug). + TimeoutState = case Timeout of + {binary, Min} -> + {Min, Min, undefined}; + _ -> undefined + end, + loop(Parent, Name, State, Mod, Timeout, TimeoutState, Queue, Debug). %%%======================================================================== %%% Gen-callback functions @@ -329,23 +353,38 @@ enter_loop(Mod, Options, State, ServerName, Timeout) -> %%% --------------------------------------------------- init_it(Starter, self, Name, Mod, Args, Options) -> init_it(Starter, self(), Name, Mod, Args, Options); -init_it(Starter, Parent, Name, Mod, Args, Options) -> +init_it(Starter, Parent, Name0, Mod, Args, Options) -> + Name = name(Name0), Debug = debug_options(Name, Options), Queue = priority_queue:new(), case catch Mod:init(Args) of {ok, State} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, infinity, Queue, Debug); + loop(Parent, Name, State, Mod, infinity, undefined, Queue, Debug); {ok, State, Timeout} -> - proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Queue, Debug); + proc_lib:init_ack(Starter, {ok, self()}), + TimeoutState = case Timeout of + {binary, Min} -> + {Min, Min, undefined}; + _ -> undefined + end, + loop(Parent, Name, State, Mod, binary, TimeoutState, Queue, Debug); {stop, Reason} -> + %% For consistency, we must make sure that the + %% registered name (if any) is unregistered before + %% the parent process is notified about the failure. + %% (Otherwise, the parent process could get + %% an 'already_started' error if it immediately + %% tried starting the process again.) + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); ignore -> + unregister_name(Name0), proc_lib:init_ack(Starter, ignore), exit(normal); {'EXIT', Reason} -> + unregister_name(Name0), proc_lib:init_ack(Starter, {error, Reason}), exit(Reason); Else -> @@ -354,33 +393,84 @@ init_it(Starter, Parent, Name, Mod, Args, Options) -> exit(Error) end. +name({local,Name}) -> Name; +name({global,Name}) -> Name; +name(Pid) when is_pid(Pid) -> Pid. + +unregister_name({local,Name}) -> + _ = (catch unregister(Name)); +unregister_name({global,Name}) -> + _ = global:unregister_name(Name); +unregister_name(Pid) when is_pid(Pid) -> + Pid. + %%%======================================================================== %%% Internal functions %%%======================================================================== %%% --------------------------------------------------- %%% The MAIN loop. %%% --------------------------------------------------- -loop(Parent, Name, State, Mod, Time, Queue, Debug) -> +loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, undefined, Queue, Debug]); +loop(Parent, Name, State, Mod, hibernate, {Current, Min, undefined}, Queue, Debug) -> + proc_lib:hibernate(?MODULE,wake_hib,[Parent, Name, State, Mod, {Current, Min, now()}, Queue, Debug]); +loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> receive Input -> loop(Parent, Name, State, Mod, - Time, in(Input, Queue), Debug) + Time, TimeoutState, in(Input, Queue), Debug) after 0 -> - case priority_queue:out(Queue) of - {{value, Msg}, Queue1} -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, false) + end. + +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, Hib) -> + case priority_queue:out(Queue) of + {{value, Msg}, Queue1} -> + process_msg(Parent, Name, State, Mod, + Time, TimeoutState, Queue1, Debug, Hib, Msg); + {empty, Queue1} -> + Time1 = case {Time, TimeoutState} of + {binary, {Current, _Min, undefined}} -> Current; + _ -> Time + end, + receive + Input -> + loop(Parent, Name, State, Mod, + Time, TimeoutState, in(Input, Queue1), Debug) + after Time1 -> process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, Msg); - {empty, Queue1} -> - receive - Input -> - loop(Parent, Name, State, Mod, - Time, in(Input, Queue1), Debug) - after Time -> - process_msg(Parent, Name, State, Mod, - Time, Queue1, Debug, timeout) - end + Time, TimeoutState, Queue1, Debug, Hib, timeout) end end. +wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> + Msg = receive + Input -> + Input + end, + TimeoutState1 = adjust_hibernate_after(TimeoutState), + process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState1, in(Msg, Queue), Debug, true). + +adjust_hibernate_after(undefined) -> + undefined; +adjust_hibernate_after({Current, Min, HibernatedAt}) -> + NapLengthMicros = timer:now_diff(now(), HibernatedAt), + CurrentMicros = Current * 1000, + LowTargetMicros = CurrentMicros * 4, + HighTargetMicros = LowTargetMicros * 4, + if + NapLengthMicros < LowTargetMicros -> + %% nap was too short, don't go to sleep as soon + {Current * 2, Min, undefined}; + + NapLengthMicros > HighTargetMicros -> + %% nap was long, try going to sleep sooner + {lists:max([Min, round(Current / 2)]), Min, undefined}; + + true -> + %% nap and timeout seem to be in the right relationship. stay here + {Current, Min, undefined} + end. + in({'$gen_pcast', {Priority, Msg}}, Queue) -> priority_queue:in({'$gen_cast', Msg}, Priority, Queue); in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> @@ -388,26 +478,34 @@ in({'$gen_pcall', From, {Priority, Msg}}, Queue) -> in(Input, Queue) -> priority_queue:in(Input, Queue). -process_msg(Parent, Name, State, Mod, Time, Queue, Debug, Msg) -> +process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug, _Hib, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg(Req, From, Parent, ?MODULE, Debug, - [Name, State, Mod, Time, Queue]); + [Name, State, Mod, Time, TimeoutState, Queue]); + %% gen_server puts Hib on the end as the 7th arg, but that + %% version of the function seems not to be documented so + %% leaving out for now. {'EXIT', Parent, Reason} -> terminate(Reason, Name, Msg, Mod, State, Debug); _Msg when Debug =:= [] -> - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue); + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue); _Msg -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {in, Msg}), - handle_msg(Msg, Parent, Name, State, Mod, Time, Queue, Debug1) + handle_msg(Msg, Parent, Name, State, Mod, TimeoutState, Queue, Debug1) end. %%% --------------------------------------------------- %%% Send/recive functions %%% --------------------------------------------------- do_send(Dest, Msg) -> - catch erlang:send(Dest, Msg). + case catch erlang:send(Dest, Msg, [noconnect]) of + noconnect -> + spawn(erlang, send, [Dest,Msg]); + Other -> + Other + end. do_multi_call(Nodes, Name, Req, infinity) -> Tag = make_ref(), @@ -598,48 +696,48 @@ dispatch(Info, Mod, State) -> Mod:handle_info(Info, State). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {reply, Reply, NState, Time1} -> reply(From, Reply), - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, [])), reply(From, Reply), exit(R); Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue) + Parent, Name, Msg, Mod, State, TimeoutState, Queue) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue) -> + Parent, Name, State, Mod, TimeoutState, Queue) -> Reply = (catch dispatch(Msg, Mod, State)), - handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue). + handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue). handle_msg({'$gen_call', From, Msg}, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> case catch Mod:handle_call(Msg, From, State) of {reply, Reply, NState} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {reply, Reply, NState, Time1} -> Debug1 = reply(Name, From, Reply, NState, Debug), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, Reply, NState} -> {'EXIT', R} = (catch terminate(Reason, Name, Msg, Mod, NState, Debug)), @@ -647,20 +745,20 @@ handle_msg({'$gen_call', From, Msg}, exit(R); Other -> handle_common_reply(Other, - Parent, Name, Msg, Mod, State, Queue, Debug) + Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) end; handle_msg(Msg, - Parent, Name, State, Mod, _Time, Queue, Debug) -> + Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> Reply = (catch dispatch(Msg, Mod, State)), handle_common_reply(Reply, - Parent, Name, Msg, Mod, State, Queue, Debug). + Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug). -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue) -> case Reply of {noreply, NState} -> - loop(Parent, Name, NState, Mod, infinity, Queue, []); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, []); {noreply, NState, Time1} -> - loop(Parent, Name, NState, Mod, Time1, Queue, []); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, []); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, []); {'EXIT', What} -> @@ -669,16 +767,16 @@ handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue) -> terminate({bad_return_value, Reply}, Name, Msg, Mod, State, []) end. -handle_common_reply(Reply, Parent, Name, Msg, Mod, State, Queue, Debug) -> +handle_common_reply(Reply, Parent, Name, Msg, Mod, State, TimeoutState, Queue, Debug) -> case Reply of {noreply, NState} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, infinity, Queue, Debug1); + loop(Parent, Name, NState, Mod, infinity, TimeoutState, Queue, Debug1); {noreply, NState, Time1} -> Debug1 = sys:handle_debug(Debug, {?MODULE, print_event}, Name, {noreply, NState}), - loop(Parent, Name, NState, Mod, Time1, Queue, Debug1); + loop(Parent, Name, NState, Mod, Time1, TimeoutState, Queue, Debug1); {stop, Reason, NState} -> terminate(Reason, Name, Msg, Mod, NState, Debug); {'EXIT', What} -> @@ -696,15 +794,17 @@ reply(Name, {To, Tag}, Reply, State, Debug) -> %%----------------------------------------------------------------- %% Callback functions for system messages handling. %%----------------------------------------------------------------- -system_continue(Parent, Debug, [Name, State, Mod, Time, Queue]) -> - loop(Parent, Name, State, Mod, Time, Queue, Debug). +system_continue(Parent, Debug, [Name, State, Mod, Time, TimeoutState, Queue]) -> + loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug). + +-spec system_terminate(_, _, _, [_]) -> no_return(). -system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _Queue]) -> +system_terminate(Reason, _Parent, Debug, [Name, State, Mod, _Time, _TimeoutState, _Queue]) -> terminate(Reason, Name, [], Mod, State, Debug). -system_code_change([Name, State, Mod, Time, Queue], _Module, OldVsn, Extra) -> +system_code_change([Name, State, Mod, Time, TimeoutState, Queue], _Module, OldVsn, Extra) -> case catch Mod:code_change(OldVsn, State, Extra) of - {ok, NewState} -> {ok, [Name, NewState, Mod, Time, Queue]}; + {ok, NewState} -> {ok, [Name, NewState, Mod, Time, TimeoutState, Queue]}; Else -> Else end. @@ -747,6 +847,8 @@ terminate(Reason, Name, Msg, Mod, State, Debug) -> exit(normal); shutdown -> exit(shutdown); + {shutdown,_}=Shutdown -> + exit(Shutdown); _ -> error_info(Reason, Name, Msg, State, Debug), exit(Reason) @@ -871,7 +973,7 @@ name_to_pid(Name) -> %% Status information %%----------------------------------------------------------------- format_status(Opt, StatusData) -> - [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, Queue]] = + [PDict, SysState, Parent, Debug, [Name, State, Mod, _Time, TimeoutState, Queue]] = StatusData, NameTag = if is_pid(Name) -> pid_to_list(Name); @@ -890,9 +992,14 @@ format_status(Opt, StatusData) -> _ -> [{data, [{"State", State}]}] end, + Specfic1 = case TimeoutState of + undefined -> Specfic; + {Current, Min, undefined} -> + [{"Binary Timeout Current and Min", {Current, Min}} | Specfic] + end, [{header, Header}, {data, [{"Status", SysState}, {"Parent", Parent}, {"Logged events", Log}, {"Queued messages", priority_queue:to_list(Queue)}]} | - Specfic]. + Specfic1]. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6b19695157..1184122022 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -57,9 +57,7 @@ next_msg_id, active_consumers, blocked_consumers, - memory_report_timer, - hibernate_after, - hibernated_at + memory_report_timer }). -record(consumer, {tag, ack_required}). @@ -113,10 +111,8 @@ init(Q = #amqqueue { name = QName, durable = Durable }) -> next_msg_id = 1, active_consumers = queue:new(), blocked_consumers = queue:new(), - memory_report_timer = start_memory_timer(), - hibernate_after = ?HIBERNATE_AFTER_MIN, - hibernated_at = undefined - }, ?HIBERNATE_AFTER_MIN}. + memory_report_timer = start_memory_timer() + }, {binary, ?HIBERNATE_AFTER_MIN}}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -135,50 +131,14 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- reply(Reply, NewState = #q { memory_report_timer = undefined }) -> - reply1(Reply, start_memory_timer(NewState)); + {reply, Reply, start_memory_timer(NewState), binary}; reply(Reply, NewState) -> - reply1(Reply, NewState). - -reply1(Reply, NewState = #q { hibernated_at = undefined }) -> - {reply, Reply, NewState, NewState #q.hibernate_after}; -reply1(Reply, NewState) -> - NewState1 = adjust_hibernate_after(NewState), - {reply, Reply, NewState1, NewState1 #q.hibernate_after}. + {reply, Reply, NewState, binary}. noreply(NewState = #q { memory_report_timer = undefined }) -> - noreply1(start_memory_timer(NewState)); + {noreply, start_memory_timer(NewState), binary}; noreply(NewState) -> - noreply1(NewState). - -noreply1(NewState = #q { hibernated_at = undefined }) -> - {noreply, NewState, NewState #q.hibernate_after}; -noreply1(NewState) -> - NewState1 = adjust_hibernate_after(NewState), - {noreply, NewState1, NewState1 #q.hibernate_after}. - -adjust_hibernate_after(State = #q { hibernated_at = undefined }) -> - State; -adjust_hibernate_after(State = #q { hibernated_at = Then, - hibernate_after = Timeout }) -> - State1 = State #q { hibernated_at = undefined }, - NapLengthMicros = timer:now_diff(now(), Then), - TimeoutMicros = Timeout * 1000, - LowTargetMicros = TimeoutMicros * 4, - HighTargetMicros = LowTargetMicros * 4, - if - NapLengthMicros < LowTargetMicros -> - %% nap was too short, don't go to sleep as soon - State1 #q { hibernate_after = Timeout * 2 }; - - NapLengthMicros > HighTargetMicros -> - %% nap was long, try going to sleep sooner - Timeout1 = lists:max([?HIBERNATE_AFTER_MIN, round(Timeout / 2)]), - State1 #q { hibernate_after = Timeout1 }; - - true -> - %% nap and timeout seem to be in the right relationship. stay here - State1 - end. + {noreply, NewState, binary}. start_memory_timer() -> {ok, TRef} = timer:apply_after(?MEMORY_REPORT_TIME_INTERVAL, @@ -881,8 +841,7 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> noreply(State #q { mixed_state = MS1 }); handle_cast(report_memory, State) -> - noreply1 - ((report_memory(false, State)) #q { memory_report_timer = undefined }). + {noreply, (report_memory(false, State)) #q { memory_report_timer = undefined }, binary}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -903,9 +862,8 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State) -> %% TODO: Once we drop support for R11B-5, we can change this to %% {noreply, State, hibernate}; - State1 = (stop_memory_timer(report_memory(true, State))) - #q { hibernated_at = now() }, - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State1]); + State1 = stop_memory_timer(report_memory(true, State)), + {noreply, State1, hibernate}; handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ed71509725..adf2462dac 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -162,9 +162,7 @@ handle_info({'EXIT', _Pid, Reason}, State) -> handle_info(timeout, State) -> ok = clear_permission_cache(), - %% TODO: Once we drop support for R11B-5, we can change this to - %% {noreply, State, hibernate}; - proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]). + {noreply, State, hibernate}. terminate(_Reason, #ch{writer_pid = WriterPid, limiter_pid = LimiterPid, state = terminating}) -> |
