diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-11 16:29:14 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-11 16:29:14 +0000 |
| commit | bdb2b89077d47d2b185c15b6a0819281a07786c9 (patch) | |
| tree | 6eb21fb01a71b5bb0bf1b6bcf345ec9a87122bc9 | |
| parent | e942644287d7e96a3f5d455b98bc6857fb747124 (diff) | |
| download | rabbitmq-server-git-bdb2b89077d47d2b185c15b6a0819281a07786c9.tar.gz | |
Various amounts of tidying, post testing and a large amount of cosmetics.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 48 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 89 |
3 files changed, 81 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 1a5e82d714..4abfcd0ba7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,8 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock/2, set_queue_duration/2, + send_memory_monitor_update/1]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -101,6 +102,8 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(send_memory_monitor_update/1 :: (pid()) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -308,6 +311,12 @@ notify_sent(QPid, ChPid) -> unblock(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {unblock, ChPid}). +set_queue_duration(QPid, Duration) -> + gen_server2:pcast(QPid, 7, {set_queue_duration, Duration}). + +send_memory_monitor_update(QPid) -> + gen_server2:pcast(QPid, 7, send_memory_monitor_update). + internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( fun () -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0bfa6df160..2d264fc274 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,7 +42,6 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). --export([send_memory_monitor_update/1]). -import(queue). -import(erlang). @@ -101,9 +100,11 @@ start_link(Q) -> %%---------------------------------------------------------------------------- init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - rabbit_memory_monitor:register(self()), + rabbit_memory_monitor:register(self(), {rabbit_amqqueue, set_queue_duration, + [self()]}), %% Beware. This breaks hibernation! - timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]), + timer:apply_interval(2500, rabbit_amqqueue, send_memory_monitor_update, + [self()]), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -821,26 +822,33 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec - QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds + QueueDuration = + case MsgSec == 0 of + true -> infinity; + false -> queue:len(State#q.message_buffer) / MsgSec % seconds + end, DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( self(), QueueDuration), - ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", - [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), + ?LOGDEBUG("TIMER ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + case DesiredQueueDuration of + infinity -> infinity; + _ -> MsgSec * DesiredQueueDuration + end]), noreply(State#q{drain_ratio = DrainRatio1}); handle_cast({set_queue_duration, DesiredQueueDuration}, State) -> DrainRatio = State#q.drain_ratio, - DesiredBufLength = case DesiredQueueDuration of - infinity -> infinity; - _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 - end, - %% Just to proove that something is happening. - ?LOGDEBUG("~p Queue length is~8p, should be ~p~n", - [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), - DesiredBufLength]), + DesiredBufLength = + case DesiredQueueDuration of + infinity -> infinity; + _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000 + end, + ?LOGDEBUG("MAGIC ~p Queue length is ~8p, should be ~p~n", + [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer), + DesiredBufLength]), noreply(State). - %% Based on kernel load average, as descibed: %% http://www.teamquest.com/resources/gunther/display/5/ calc_load(Load, Exp, N) -> @@ -852,14 +860,8 @@ update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount = MsgCount1 - MsgCount0, MsgUSec = MsgCount / Td, % msg/usec %% Td is in usec. We're interested in "load average" from last 30 seconds. - Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), - - #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. - - -send_memory_monitor_update(Pid) -> - gen_server2:cast(Pid, send_memory_monitor_update). - + Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), + #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 7bd03c9c42..cf184f3f43 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -74,22 +74,19 @@ -behaviour(gen_server2). --export([start_link/0]). +-export([start_link/0, update/0, register/2, report_queue_duration/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([update/0]). - --export([register/1, report_queue_duration/2]). - -record(state, {timer, %% 'internal_update' timer queue_durations, %% ets, (qpid, last_reported, last_sent) queue_duration_sum, %% sum of all queue_durations queue_duration_count, %% number of elements in sum memory_limit, %% how much memory we intend to use memory_ratio, %% how much more memory we can use - desired_duration %% the desired queue duration + desired_duration, %% the desired queue duration + callbacks %% a dict of qpid -> {M,F,A}s }). -define(SERVER, ?MODULE). @@ -101,25 +98,19 @@ -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). %%---------------------------------------------------------------------------- + -ifdef(use_specs). --type(state() :: #state{timer :: timer:tref(), - queue_durations :: tid(), - queue_duration_sum :: float(), - queue_duration_count:: non_neg_integer(), - memory_limit :: pos_integer(), - memory_ratio :: float(), - desired_duration :: float() | 'infinity' }). -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). --spec(register/1 :: (pid()) -> 'ok'). +-spec(update/0 :: () -> 'ok'). +-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). -spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). --spec(init/1 :: ([]) -> {'ok', state()}). - --spec(internal_update/1 :: (state()) -> state()). -endif. %%---------------------------------------------------------------------------- +%% Public API +%%---------------------------------------------------------------------------- start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). @@ -127,22 +118,17 @@ start_link() -> update() -> gen_server2:cast(?SERVER, update). -%%---------------------------------------------------------------------------- - -register(Pid) -> - gen_server2:cast(?SERVER, {register, Pid}). +register(Pid, MFA = {_M, _F, _A}) -> + gen_server2:cast(?SERVER, {register, Pid, MFA}). report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, {report_queue_duration, Pid, QueueDuration}). -%%---------------------------------------------------------------------------- -get_memory_limit() -> - case vm_memory_monitor:get_memory_limit() of - undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; - A -> A - end. +%%---------------------------------------------------------------------------- +%% Gen_server callbacks +%%---------------------------------------------------------------------------- init([]) -> %% We should never use more memory than user requested. As the memory @@ -158,7 +144,8 @@ init([]) -> queue_duration_count = 0, memory_limit = MemoryLimit, memory_ratio = 1.0, - desired_duration = infinity}}. + desired_duration = infinity, + callbacks = dict:new()}}. handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, @@ -187,23 +174,23 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, handle_call(_Request, _From, State) -> {noreply, State}. - handle_cast(update, State) -> {noreply, internal_update(State)}; -handle_cast({register, Pid}, State) -> +handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations, + callbacks = Callbacks}) -> _MRef = erlang:monitor(process, Pid), - true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), - {noreply, State}; + true = ets:insert(Durations, {Pid, infinity, infinity}), + {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; handle_cast(_Request, State) -> {noreply, State}. - handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations}) -> + queue_durations = Durations, + callbacks = Callbacks}) -> [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), Sum1 = case PrevQueueDuration of infinity -> Sum; @@ -211,26 +198,30 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, end, true = ets:delete(State#state.queue_durations, Pid), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_count = Count-1}}; + queue_duration_count = Count-1, + callbacks = dict:erase(Pid, Callbacks)}}; handle_info(_Info, State) -> {noreply, State}. - -terminate(_Reason, _State) -> +terminate(_Reason, #state{timer = TRef}) -> + timer:cancel(TRef), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. -set_queue_duration(Pid, QueueDuration) -> - gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). + +%%---------------------------------------------------------------------------- +%% Internal functions +%%---------------------------------------------------------------------------- internal_update(State = #state{memory_limit = Limit, queue_durations = Durations, desired_duration = DesiredDurationAvg, queue_duration_sum = Sum, - queue_duration_count = Count}) -> + queue_duration_count = Count, + callbacks = Callbacks}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), AvgDuration = case Count of @@ -246,7 +237,8 @@ internal_update(State = #state{memory_limit = Limit, %% only inform queues immediately if the desired duration has %% decreased - case DesiredDurationAvg1 < DesiredDurationAvg of + case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity) + orelse (DesiredDurationAvg1 < DesiredDurationAvg) of true -> %% If we have pessimistic information, we need to inform %% queues to reduce it's memory usage when needed. This @@ -256,8 +248,9 @@ internal_update(State = #state{memory_limit = Limit, case DesiredDurationAvg1 < lists:min([PrevSendDuration, QueueDuration]) of true -> - set_queue_duration(Pid, - DesiredDurationAvg1), + ok = + set_queue_duration( + Pid, DesiredDurationAvg1, Callbacks), ets:insert(Durations, {Pid, QueueDuration, DesiredDurationAvg1}); @@ -267,3 +260,13 @@ internal_update(State = #state{memory_limit = Limit, false -> ok end, State1. + +get_memory_limit() -> + case vm_memory_monitor:get_memory_limit() of + undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; + A -> A + end. + +set_queue_duration(Pid, QueueDuration, Callbacks) -> + {M,F,A} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(M, F, A++[QueueDuration]). |
