diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-03 17:42:02 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-03 17:42:02 +0100 |
| commit | 519e73a0ad74c25ca8169322a1cfe5f42b2f7ab9 (patch) | |
| tree | 5db18b7cc7769453fcd30a51bc6cce5cbf7ef042 | |
| parent | fd792ab2ad5126425dbc23774218db547542fecb (diff) | |
| parent | 091b70039aee2701a42b7e19ce1e48889638bb79 (diff) | |
| download | rabbitmq-server-git-519e73a0ad74c25ca8169322a1cfe5f42b2f7ab9.tar.gz | |
merge in from 21087
| -rw-r--r-- | src/gen_server2.erl | 156 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_queue_prefetcher.erl | 12 |
4 files changed, 102 insertions, 79 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl index be2c5730d6..529ed0295e 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -345,7 +345,8 @@ enter_loop(Mod, Options, State) -> enter_loop(Mod, Options, State, self(), infinity, undefined). enter_loop(Mod, Options, State, Backoff = {backoff, _, _ , _}) -> - enter_loop(Mod, Options, State, self(), infinity, Backoff); + Backoff1 = extend_backoff(Mod, Backoff), + enter_loop(Mod, Options, State, self(), infinity, Backoff1); enter_loop(Mod, Options, State, ServerName = {_, _}) -> enter_loop(Mod, Options, State, ServerName, infinity, undefined); @@ -354,7 +355,8 @@ enter_loop(Mod, Options, State, Timeout) -> enter_loop(Mod, Options, State, self(), Timeout, undefined). enter_loop(Mod, Options, State, ServerName, Backoff = {backoff, _, _, _}) -> - enter_loop(Mod, Options, State, ServerName, infinity, Backoff); + Backoff1 = extend_backoff(Mod, Backoff), + enter_loop(Mod, Options, State, ServerName, infinity, Backoff1); enter_loop(Mod, Options, State, ServerName, Timeout) -> enter_loop(Mod, Options, State, ServerName, Timeout, undefined). @@ -392,7 +394,8 @@ init_it(Starter, Parent, Name0, Mod, Args, Options) -> loop(Parent, Name, State, Mod, Timeout, undefined, Queue, Debug); {ok, State, Timeout, Backoff = {backoff, _, _, _}} -> proc_lib:init_ack(Starter, {ok, self()}), - loop(Parent, Name, State, Mod, Timeout, Backoff, Queue, Debug); + Backoff1 = extend_backoff(Mod, Backoff), + loop(Parent, Name, State, Mod, Timeout, Backoff1, Queue, Debug); {stop, Reason} -> %% For consistency, we must make sure that the %% registered name (if any) is unregistered before @@ -430,6 +433,12 @@ unregister_name({global,Name}) -> unregister_name(Pid) when is_pid(Pid) -> Pid. +extend_backoff(Mod, {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod}) -> + Pre = erlang:function_exported(Mod, handle_pre_hibernate, 1), + Post = erlang:function_exported(Mod, handle_post_hibernate, 1), + random:seed(now()), %% call before we get into the loop + {backoff, CurrentTO, MinimumTimeout, DesiredHibPeriod, Pre, Post}. + %%%======================================================================== %%% Internal functions %%%======================================================================== @@ -440,25 +449,36 @@ loop(Parent, Name, State, Mod, hibernate, undefined, Queue, Debug) -> proc_lib:hibernate(?MODULE,wake_hib, [Parent, Name, State, Mod, undefined, Queue, Debug]); loop(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> + process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, + drain(Queue), Debug). + +drain(Queue) -> receive - Input -> loop(Parent, Name, State, Mod, - Time, TimeoutState, in(Input, Queue), Debug) - after 0 -> - process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, - Queue, Debug, false) + Input -> drain(in(Input, Queue)) + after 0 -> Queue end. -process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, Hib) -> +process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, Debug) -> case priority_queue:out(Queue) of {{value, Msg}, Queue1} -> process_msg(Parent, Name, State, Mod, - Time, TimeoutState, Queue1, Debug, Hib, Msg); + Time, TimeoutState, Queue1, Debug, Msg); {empty, Queue1} -> {Time1, HibOnTimeout} = case {Time, TimeoutState} of - {hibernate, {backoff, Current, _Min, _Desired}} -> + {hibernate, + {backoff, Current, _Min, _Desired, _Pre, _Post}} -> {Current, true}; + {hibernate, _} -> + %% wake_hib/7 will set Time to hibernate. If + %% we were woken and didn't receive a msg + %% then we will get here and need a sensible + %% value for Time1, otherwise we crash. + %% On the grounds that it's better to get + %% control back to the user module sooner + %% rather than later, 0 is more sensible + %% than infinity here. + {0, false}; _ -> {Time, false} end, receive @@ -474,72 +494,82 @@ process_next_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, false -> process_msg( Parent, Name, State, Mod, Time, TimeoutState, - Queue1, Debug, Hib, timeout) + Queue1, Debug, timeout) end end end. wake_hib(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - Msg = receive - Input -> - Input - end, process_next_msg(Parent, Name, State, Mod, hibernate, TimeoutState, - in(Msg, Queue), Debug, true). + drain(Queue), Debug). wake_hib(Parent, Name, State, Mod, SleptAt, TimeoutState, Queue, Debug) -> - AwokeAt = now(), - Msg = receive - Input -> - Input - end, - backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - TimeoutState, in(Msg, Queue), Debug). - -backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState, Queue, Debug) -> - case catch Mod:handle_pre_hibernate(State) of - {hibernate, NState} -> - proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, Mod, - now(), TimeoutState, - Queue, Debug]); - {stop, Reason, NState} -> - terminate(Reason, Name, pre_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, pre_hibernate, Mod, State, []); - Reply -> - terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, - State, []) + backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, now(), + TimeoutState, drain(Queue), Debug). + +backoff_pre_hibernate(Parent, Name, State, Mod, TimeoutState = + {backoff, _Current, _Minimum, _Desired, Pre, _Post}, + Queue, Debug) -> + case Pre of + true -> + case catch Mod:handle_pre_hibernate(State) of + {hibernate, NState} -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, NState, + Mod, now(), + TimeoutState, Queue, + Debug]); + {stop, Reason, NState} -> + terminate(Reason, Name, pre_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, pre_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, pre_hibernate, Mod, + State, []) + end; + false -> + proc_lib:hibernate(?MODULE, wake_hib, [Parent, Name, State, Mod, + now(), TimeoutState, Queue, + Debug]) end. backoff_post_hibernate(Parent, Name, State, Mod, SleptAt, AwokeAt, - {backoff, CurrentTO, MinimumTO, DesiredHibPeriod}, + {backoff, CurrentTO, MinimumTO, DesiredHibPeriod, + Pre, Post}, Queue, Debug) -> NapLengthMicros = timer:now_diff(AwokeAt, SleptAt), CurrentMicros = CurrentTO * 1000, MinimumMicros = MinimumTO * 1000, DesiredHibMicros = DesiredHibPeriod * 1000, - CurrentTO1 = case (NapLengthMicros + CurrentMicros) > - (MinimumMicros + DesiredHibMicros) of - true -> - lists:max([MinimumTO, round(CurrentTO/2)]); - false -> - CurrentTO + MinimumTO - end, - TimeoutState = {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod}, - case catch Mod:handle_post_hibernate(State) of - {noreply, NState} -> - process_next_msg(Parent, Name, NState, Mod, infinity, TimeoutState, - Queue, Debug, true); - {noreply, NState, Time} -> - process_next_msg(Parent, Name, NState, Mod, Time, TimeoutState, - Queue, Debug, true); - {stop, Reason, NState} -> - terminate(Reason, Name, post_hibernate, Mod, NState, []); - {'EXIT', What} -> - terminate(What, Name, post_hibernate, Mod, State, []); - Reply -> - terminate({bad_return_value, Reply}, Name, post_hibernate, Mod, - State, []) + GapBetweenMessagesMicros = NapLengthMicros + CurrentMicros, + Base = + %% If enough time has passed between the last two messages then we + %% should consider sleeping sooner. Otherwise stay awake longer. + case GapBetweenMessagesMicros > (MinimumMicros + DesiredHibMicros) of + true -> lists:max([MinimumTO, CurrentTO div 2]); + false -> CurrentTO + end, + CurrentTO1 = Base + random:uniform(Base), + TimeoutState = + {backoff, CurrentTO1, MinimumTO, DesiredHibPeriod, Pre, Post}, + case Post of + true -> + case catch Mod:handle_post_hibernate(State) of + {noreply, NState} -> + loop(Parent, Name, NState, Mod, infinity, TimeoutState, + Queue, Debug); + {noreply, NState, Time} -> + loop(Parent, Name, NState, Mod, Time, TimeoutState, Queue, + Debug); + {stop, Reason, NState} -> + terminate(Reason, Name, post_hibernate, Mod, NState, []); + {'EXIT', What} -> + terminate(What, Name, post_hibernate, Mod, State, []); + Reply -> + terminate({bad_return_value, Reply}, Name, post_hibernate, + Mod, State, []) + end; + false -> loop(Parent, Name, State, Mod, infinity, TimeoutState, Queue, + Debug) end. in({'$gen_pcast', {Priority, Msg}}, Queue) -> @@ -550,7 +580,7 @@ in(Input, Queue) -> priority_queue:in(Input, Queue). process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue, - Debug, _Hib, Msg) -> + Debug, Msg) -> case Msg of {system, From, Req} -> sys:handle_system_msg diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a0e6b1c69d..dba7ec2406 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -43,8 +43,7 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). --export([handle_pre_hibernate/1, handle_post_hibernate/1]). + handle_info/2, handle_pre_hibernate/1]). -import(queue). -import(erlang). @@ -842,7 +841,7 @@ handle_cast({set_mode, Mode}, State = #q { mixed_state = MS }) -> handle_cast(report_memory, State) -> %% deliberately don't call noreply/2 as we don't want to restart the timer %% by unsetting the timer, we force a report on the next normal message - {noreply, State #q { memory_report_timer = undefined }, binary}. + {noreply, State #q { memory_report_timer = undefined }, hibernate}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -870,6 +869,3 @@ handle_pre_hibernate(State = #q { mixed_state = MS }) -> stop_memory_timer(report_memory(true, State #q { mixed_state = MS1 })), %% don't call noreply/1 as that'll restart the memory_report_timer {hibernate, State1}. - -handle_post_hibernate(State) -> - {noreply, State, hibernate}. diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 154e8a9054..fe8c433c7b 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -37,7 +37,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([handle_pre_hibernate/1, handle_post_hibernate/1]). +-export([handle_pre_hibernate/1]). -export([publish/3, deliver/1, phantom_deliver/1, ack/2, tx_publish/1, tx_commit/3, tx_cancel/1, @@ -569,9 +569,6 @@ handle_pre_hibernate(State) -> ok = report_memory(true, State), {hibernate, stop_memory_timer(State)}. -handle_post_hibernate(State) -> - noreply(State). - terminate(_Reason, State) -> shutdown(State). diff --git a/src/rabbit_queue_prefetcher.erl b/src/rabbit_queue_prefetcher.erl index dfd444b259..0265ba2bd2 100644 --- a/src/rabbit_queue_prefetcher.erl +++ b/src/rabbit_queue_prefetcher.erl @@ -43,6 +43,7 @@ -include("rabbit.hrl"). -define(HIBERNATE_AFTER_MIN, 1000). +-define(DESIRED_HIBERNATE, 10000). -record(pstate, { msg_buf, @@ -209,7 +210,8 @@ init([Q, Count, QPid]) -> queue_mref = MRef }, ok = rabbit_disk_queue:prefetch(Q), - {ok, State, {binary, ?HIBERNATE_AFTER_MIN}}. + {ok, State, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, + ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. handle_call(drain, _From, State = #pstate { buf_length = 0 }) -> {stop, normal, empty, State}; @@ -221,7 +223,7 @@ handle_call(drain, _From, State = #pstate { fetched_count = Count, handle_call(drain, _From, State = #pstate { msg_buf = MsgBuf, buf_length = Length }) -> {reply, {MsgBuf, Length, continuing}, - State #pstate { msg_buf = queue:new(), buf_length = 0 }}; + State #pstate { msg_buf = queue:new(), buf_length = 0 }, hibernate}; handle_call(drain_and_stop, _From, State = #pstate { buf_length = 0 }) -> {stop, normal, empty, State}; handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, @@ -231,7 +233,7 @@ handle_call(drain_and_stop, _From, State = #pstate { msg_buf = MsgBuf, handle_cast(publish_empty, State) -> %% Very odd. This could happen if the queue is deleted or purged %% and the mixed queue fails to shut us down. - {noreply, State}; + {noreply, State, hibernate}; handle_cast({publish, { Msg = #basic_message {}, _Size, IsDelivered, AckTag, _Remaining }}, State = #pstate { fetched_count = Fetched, target_count = Target, @@ -245,10 +247,8 @@ handle_cast({publish, { Msg = #basic_message {}, MsgBuf1 = queue:in({Msg, IsDelivered, AckTag}, MsgBuf), {noreply, State #pstate { fetched_count = Fetched + 1, buf_length = Length + 1, - msg_buf = MsgBuf1 }}. + msg_buf = MsgBuf1 }, hibernate}. -handle_info(timeout, State) -> - {noreply, State, hibernate}; handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #pstate { queue_mref = MRef }) -> %% this is the amqqueue_process going down, so we should go down |
