summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-11 16:29:14 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-11 16:29:14 +0000
commitbdb2b89077d47d2b185c15b6a0819281a07786c9 (patch)
tree6eb21fb01a71b5bb0bf1b6bcf345ec9a87122bc9
parente942644287d7e96a3f5d455b98bc6857fb747124 (diff)
downloadrabbitmq-server-git-bdb2b89077d47d2b185c15b6a0819281a07786c9.tar.gz
Various amounts of tidying, post testing and a large amount of cosmetics.
-rw-r--r--src/rabbit_amqqueue.erl11
-rw-r--r--src/rabbit_amqqueue_process.erl48
-rw-r--r--src/rabbit_memory_monitor.erl89
3 files changed, 81 insertions, 67 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 1a5e82d714..4abfcd0ba7 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,8 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock/2, set_queue_duration/2,
+ send_memory_monitor_update/1]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -101,6 +102,8 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
+-spec(send_memory_monitor_update/1 :: (pid()) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -308,6 +311,12 @@ notify_sent(QPid, ChPid) ->
unblock(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+set_queue_duration(QPid, Duration) ->
+ gen_server2:pcast(QPid, 7, {set_queue_duration, Duration}).
+
+send_memory_monitor_update(QPid) ->
+ gen_server2:pcast(QPid, 7, send_memory_monitor_update).
+
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
fun () ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0bfa6df160..2d264fc274 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,6 @@
-export([start_link/1]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]).
--export([send_memory_monitor_update/1]).
-import(queue).
-import(erlang).
@@ -101,9 +100,11 @@ start_link(Q) ->
%%----------------------------------------------------------------------------
init(Q) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- rabbit_memory_monitor:register(self()),
+ rabbit_memory_monitor:register(self(), {rabbit_amqqueue, set_queue_duration,
+ [self()]}),
%% Beware. This breaks hibernation!
- timer:apply_interval(2500, ?MODULE, send_memory_monitor_update, [self()]),
+ timer:apply_interval(2500, rabbit_amqqueue, send_memory_monitor_update,
+ [self()]),
{ok, #q{q = Q,
owner = none,
exclusive_consumer = none,
@@ -821,26 +822,33 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
handle_cast(send_memory_monitor_update, State) ->
DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id),
MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec
- QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds
+ QueueDuration =
+ case MsgSec == 0 of
+ true -> infinity;
+ false -> queue:len(State#q.message_buffer) / MsgSec % seconds
+ end,
DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration(
self(), QueueDuration),
- ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n",
- [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]),
+ ?LOGDEBUG("TIMER ~p Queue length is ~8p, should be ~p~n",
+ [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer),
+ case DesiredQueueDuration of
+ infinity -> infinity;
+ _ -> MsgSec * DesiredQueueDuration
+ end]),
noreply(State#q{drain_ratio = DrainRatio1});
handle_cast({set_queue_duration, DesiredQueueDuration}, State) ->
DrainRatio = State#q.drain_ratio,
- DesiredBufLength = case DesiredQueueDuration of
- infinity -> infinity;
- _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000
- end,
- %% Just to proove that something is happening.
- ?LOGDEBUG("~p Queue length is~8p, should be ~p~n",
- [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer),
- DesiredBufLength]),
+ DesiredBufLength =
+ case DesiredQueueDuration of
+ infinity -> infinity;
+ _ -> DesiredQueueDuration * DrainRatio#ratio.ratio * 1000000
+ end,
+ ?LOGDEBUG("MAGIC ~p Queue length is ~8p, should be ~p~n",
+ [(State#q.q)#amqqueue.name, queue:len(State#q.message_buffer),
+ DesiredBufLength]),
noreply(State).
-
%% Based on kernel load average, as descibed:
%% http://www.teamquest.com/resources/gunther/display/5/
calc_load(Load, Exp, N) ->
@@ -852,14 +860,8 @@ update_ratio(_RatioRec = #ratio{ratio=Ratio, t0 = T0, next_msg_id = MsgCount0},
MsgCount = MsgCount1 - MsgCount0,
MsgUSec = MsgCount / Td, % msg/usec
%% Td is in usec. We're interested in "load average" from last 30 seconds.
- Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec),
-
- #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}.
-
-
-send_memory_monitor_update(Pid) ->
- gen_server2:cast(Pid, send_memory_monitor_update).
-
+ Ratio1 = calc_load(Ratio, 1.0/ (math:exp(Td/(30*1000000))), MsgUSec),
+ #ratio{ratio = Ratio1, t0=T1, next_msg_id = MsgCount1}.
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 7bd03c9c42..cf184f3f43 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -74,22 +74,19 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/0, update/0, register/2, report_queue_duration/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([update/0]).
-
--export([register/1, report_queue_duration/2]).
-
-record(state, {timer, %% 'internal_update' timer
queue_durations, %% ets, (qpid, last_reported, last_sent)
queue_duration_sum, %% sum of all queue_durations
queue_duration_count, %% number of elements in sum
memory_limit, %% how much memory we intend to use
memory_ratio, %% how much more memory we can use
- desired_duration %% the desired queue duration
+ desired_duration, %% the desired queue duration
+ callbacks %% a dict of qpid -> {M,F,A}s
}).
-define(SERVER, ?MODULE).
@@ -101,25 +98,19 @@
-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
%%----------------------------------------------------------------------------
+
-ifdef(use_specs).
--type(state() :: #state{timer :: timer:tref(),
- queue_durations :: tid(),
- queue_duration_sum :: float(),
- queue_duration_count:: non_neg_integer(),
- memory_limit :: pos_integer(),
- memory_ratio :: float(),
- desired_duration :: float() | 'infinity' }).
-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
--spec(register/1 :: (pid()) -> 'ok').
+-spec(update/0 :: () -> 'ok').
+-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
--spec(init/1 :: ([]) -> {'ok', state()}).
-
--spec(internal_update/1 :: (state()) -> state()).
-endif.
%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
@@ -127,22 +118,17 @@ start_link() ->
update() ->
gen_server2:cast(?SERVER, update).
-%%----------------------------------------------------------------------------
-
-register(Pid) ->
- gen_server2:cast(?SERVER, {register, Pid}).
+register(Pid, MFA = {_M, _F, _A}) ->
+ gen_server2:cast(?SERVER, {register, Pid, MFA}).
report_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
{report_queue_duration, Pid, QueueDuration}).
-%%----------------------------------------------------------------------------
-get_memory_limit() ->
- case vm_memory_monitor:get_memory_limit() of
- undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
- A -> A
- end.
+%%----------------------------------------------------------------------------
+%% Gen_server callbacks
+%%----------------------------------------------------------------------------
init([]) ->
%% We should never use more memory than user requested. As the memory
@@ -158,7 +144,8 @@ init([]) ->
queue_duration_count = 0,
memory_limit = MemoryLimit,
memory_ratio = 1.0,
- desired_duration = infinity}}.
+ desired_duration = infinity,
+ callbacks = dict:new()}}.
handle_call({report_queue_duration, Pid, QueueDuration}, From,
State = #state{queue_duration_sum = Sum,
@@ -187,23 +174,23 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
handle_call(_Request, _From, State) ->
{noreply, State}.
-
handle_cast(update, State) ->
{noreply, internal_update(State)};
-handle_cast({register, Pid}, State) ->
+handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations,
+ callbacks = Callbacks}) ->
_MRef = erlang:monitor(process, Pid),
- true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}),
- {noreply, State};
+ true = ets:insert(Durations, {Pid, infinity, infinity}),
+ {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}};
handle_cast(_Request, State) ->
{noreply, State}.
-
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{queue_duration_sum = Sum,
queue_duration_count = Count,
- queue_durations = Durations}) ->
+ queue_durations = Durations,
+ callbacks = Callbacks}) ->
[{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
Sum1 = case PrevQueueDuration of
infinity -> Sum;
@@ -211,26 +198,30 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
end,
true = ets:delete(State#state.queue_durations, Pid),
{noreply, State#state{queue_duration_sum = Sum1,
- queue_duration_count = Count-1}};
+ queue_duration_count = Count-1,
+ callbacks = dict:erase(Pid, Callbacks)}};
handle_info(_Info, State) ->
{noreply, State}.
-
-terminate(_Reason, _State) ->
+terminate(_Reason, #state{timer = TRef}) ->
+ timer:cancel(TRef),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-set_queue_duration(Pid, QueueDuration) ->
- gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}).
+
+%%----------------------------------------------------------------------------
+%% Internal functions
+%%----------------------------------------------------------------------------
internal_update(State = #state{memory_limit = Limit,
queue_durations = Durations,
desired_duration = DesiredDurationAvg,
queue_duration_sum = Sum,
- queue_duration_count = Count}) ->
+ queue_duration_count = Count,
+ callbacks = Callbacks}) ->
%% available memory / used memory
MemoryRatio = Limit / erlang:memory(total),
AvgDuration = case Count of
@@ -246,7 +237,8 @@ internal_update(State = #state{memory_limit = Limit,
%% only inform queues immediately if the desired duration has
%% decreased
- case DesiredDurationAvg1 < DesiredDurationAvg of
+ case (DesiredDurationAvg == infinity andalso DesiredDurationAvg /= infinity)
+ orelse (DesiredDurationAvg1 < DesiredDurationAvg) of
true ->
%% If we have pessimistic information, we need to inform
%% queues to reduce it's memory usage when needed. This
@@ -256,8 +248,9 @@ internal_update(State = #state{memory_limit = Limit,
case DesiredDurationAvg1 <
lists:min([PrevSendDuration, QueueDuration]) of
true ->
- set_queue_duration(Pid,
- DesiredDurationAvg1),
+ ok =
+ set_queue_duration(
+ Pid, DesiredDurationAvg1, Callbacks),
ets:insert(Durations,
{Pid, QueueDuration,
DesiredDurationAvg1});
@@ -267,3 +260,13 @@ internal_update(State = #state{memory_limit = Limit,
false -> ok
end,
State1.
+
+get_memory_limit() ->
+ case vm_memory_monitor:get_memory_limit() of
+ undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
+ A -> A
+ end.
+
+set_queue_duration(Pid, QueueDuration, Callbacks) ->
+ {M,F,A} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(M, F, A++[QueueDuration]).