summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMarek Majkowski <majek@lshift.net>2009-10-06 18:22:36 +0100
committerMarek Majkowski <majek@lshift.net>2009-10-06 18:22:36 +0100
commitb2b1590fb3a2699a21404798a5d66e4866134d1a (patch)
treec2fd582a98dc26b30a2845b4f1ecd6bdaf33b542 /src
parent3c2a649064939068c089fa38a82b287553f85582 (diff)
downloadrabbitmq-server-git-b2b1590fb3a2699a21404798a5d66e4866134d1a.tar.gz
Changes to amqqueue_process required to proove that the code works.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl65
1 files changed, 62 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index fe2e8509f7..fa3d17a884 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,6 +42,7 @@
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
+-export([send_memory_monitor_update/1]).
-import(queue).
-import(erlang).
@@ -55,12 +56,18 @@
next_msg_id,
message_buffer,
active_consumers,
- blocked_consumers}).
+ blocked_consumers,
+ drain_ratio}).
-record(consumer, {tag, ack_required}).
-record(tx, {ch_pid, is_persistent, pending_messages, pending_acks}).
+-record(ratio, {ratio, %% float. messages/microsecond_us
+ t0, %% previous timestamp (us)
+ next_msg_id %% previous next_msg_id
+ }).
+
%% These are held in our process dictionary
-record(cr, {consumer_count,
ch_pid,
@@ -92,9 +99,15 @@ start_link(Q) ->
gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
+now_us() ->
+ {Megaseconds,Seconds,Microseconds} = erlang:now(),
+ Megaseconds * 1000000 * 1000000 + Seconds * 1000000 + Microseconds.
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
+ rabbit_memory_monitor:register(self()),
+ %% Beware. This breaks hibernation!
+ timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -102,7 +115,11 @@ init(Q) ->
next_msg_id = 1,
message_buffer = queue:new(),
active_consumers = queue:new(),
- blocked_consumers = queue:new()}, hibernate,
+ blocked_consumers = queue:new(),
+ drain_ratio = #ratio{ratio = 0.0,
+ t0 = now_us(),
+ next_msg_id = 1}
+ }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
terminate(_Reason, State) ->
@@ -797,7 +814,49 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end));
+
+handle_cast(send_memory_monitor_update, State) ->
+ DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id),
+ MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec
+ BufSec = case MsgSec < 0.016 of %% less than 1 msg/1 minute
+ true -> infinity;
+ false -> queue:len(State#q.message_buffer) / MsgSec
+ end,
+ gen_server2:cast(rabbit_memory_monitor, {push_drain_ratio, self(), BufSec}),
+ noreply(State#q{drain_ratio = DrainRatio1});
+
+handle_cast({set_bufsec_limit, BufSec}, State) ->
+ DrainRatio = State#q.drain_ratio,
+ DesiredQueueLength = case BufSec of
+ infinity -> infinity;
+ _ -> BufSec * DrainRatio#ratio.ratio * 1000000
+ end,
+ %% Just to proove that something is happening.
+ io:format("Queue size is ~8p, should be ~p~n",
+ [queue:len(State#q.message_buffer), DesiredQueueLength]),
+ noreply(State).
+
+
+%% Based on kernel load average, as descibed:
+%% http://www.teamquest.com/resources/gunther/display/5/
+calc_load(Load, Exp, N) ->
+ Load*Exp + N*(1.0-Exp).
+
+update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0}, MsgCount1) ->
+ T1 = now_us(),
+ Td = T1 - T0,
+ MsgCount = MsgCount1 - MsgCount0,
+ MsgUSec = MsgCount / Td, % msg/usec
+ %% Td is in usec. We're interested in "load average" from last 30 seconds.
+ Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec),
+
+ #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}.
+
+
+send_memory_monitor_update(Pid) ->
+ gen_server2:cast(Pid, send_memory_monitor_update).
+
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->