diff options
| author | Marek Majkowski <majek@lshift.net> | 2009-10-06 18:22:36 +0100 |
|---|---|---|
| committer | Marek Majkowski <majek@lshift.net> | 2009-10-06 18:22:36 +0100 |
| commit | b2b1590fb3a2699a21404798a5d66e4866134d1a (patch) | |
| tree | c2fd582a98dc26b30a2845b4f1ecd6bdaf33b542 /src | |
| parent | 3c2a649064939068c089fa38a82b287553f85582 (diff) | |
| download | rabbitmq-server-git-b2b1590fb3a2699a21404798a5d66e4866134d1a.tar.gz | |
Changes to amqqueue_process required to proove that the code works.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 65 |
1 files changed, 62 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fe2e8509f7..fa3d17a884 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -42,6 +42,7 @@ -export([start_link/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). +-export([send_memory_monitor_update/1]). -import(queue). -import(erlang). @@ -55,12 +56,18 @@ next_msg_id, message_buffer, active_consumers, - blocked_consumers}). + blocked_consumers, + drain_ratio}). -record(consumer, {tag, ack_required}). -record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}). +-record(ratio, {ratio, %% float. messages/microsecond_us + t0, %% previous timestamp (us) + next_msg_id %% previous next_msg_id + }). + %% These are held in our process dictionary -record(cr, {consumer_count, ch_pid, @@ -92,9 +99,15 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). %%---------------------------------------------------------------------------- +now_us() -> + {Megaseconds,Seconds,Microseconds} = erlang:now(), + Megaseconds * 1000000 * 1000000 + Seconds * 1000000 + Microseconds. init(Q) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), + rabbit_memory_monitor:register(self()), + %% Beware. This breaks hibernation! + timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]), {ok, #q{q = Q, owner = none, exclusive_consumer = none, @@ -102,7 +115,11 @@ init(Q) -> next_msg_id = 1, message_buffer = queue:new(), active_consumers = queue:new(), - blocked_consumers = queue:new()}, hibernate, + blocked_consumers = queue:new(), + drain_ratio = #ratio{ratio = 0.0, + t0 = now_us(), + next_msg_id = 1} + }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(_Reason, State) -> @@ -797,7 +814,49 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end)); + +handle_cast(send_memory_monitor_update, State) -> + DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), + MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec + BufSec = case MsgSec < 0.016 of %% less than 1 msg/1 minute + true -> infinity; + false -> queue:len(State#q.message_buffer) / MsgSec + end, + gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}), + noreply(State#q{drain_ratio = DrainRatio1}); + +handle_cast({set_bufsec_limit, BufSec}, State) -> + DrainRatio = State#q.drain_ratio, + DesiredQueueLength = case BufSec of + infinity -> infinity; + _ -> BufSec * DrainRatio#ratio.ratio * 1000000 + end, + %% Just to proove that something is happening. + io:format("Queue size is ~8p, should be ~p~n", + [queue:len(State#q.message_buffer), DesiredQueueLength]), + noreply(State). + + +%% Based on kernel load average, as descibed: +%% http://www.teamquest.com/resources/gunther/display/5/ +calc_load(Load, Exp, N) -> + Load*Exp + N*(1.0-Exp). + +update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) -> + T1 = now_us(), + Td = T1 - T0, + MsgCount = MsgCount1 - MsgCount0, + MsgUSec = MsgCount / Td, % msg/usec + %% Td is in usec. We're interested in "load average" from last 30 seconds. + Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec), + + #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}. + + +send_memory_monitor_update(Pid) -> + gen_server2:cast(Pid, send_memory_monitor_update). + handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> |
