summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-11 15:16:20 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-11 15:16:20 +0000
commite942644287d7e96a3f5d455b98bc6857fb747124 (patch)
treea4a53bdf530d7b4cd145f0b5b3450a37768ee917
parent862159f445f21fc128c5e93e746859b3c961ea8c (diff)
downloadrabbitmq-server-git-e942644287d7e96a3f5d455b98bc6857fb747124.tar.gz
Unhappy with the idea that the desired duration is affected immediately by new queues and queues dying, but the memory ratio, which reflects the amount of memory erlang has used, is updated periodically. This mix of up-to-date and stale information in the calculation of the desired duration alarms me. Thus store the desired duration in the state, and always report that. That is then update periodically, thus is only ever calculated using current values.
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_memory_monitor.erl71
2 files changed, 37 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b97fe8651..0bfa6df160 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -822,8 +822,8 @@ handle_cast(send_memory_monitor_update, State) ->
DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id),
MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec
QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds
- DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration(
- self(), QueueDuration),
+ DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration(
+ self(), QueueDuration),
?LOGDEBUG("~p Queue duration current/desired ~p/~p~n",
[(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]),
noreply(State#q{drain_ratio = DrainRatio1});
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 4880b260d8..7bd03c9c42 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -81,18 +81,19 @@
-export([update/0]).
--export([register/1, push_queue_duration/2]).
-
--record(state, {timer, %% 'internal_update' timer
- queue_durations, %% ets, (qpid, last_reported, last_sent)
- queue_duration_sum, %% sum of all queue_durations
- queue_duration_count,%% number of elements in sum
- memory_limit, %% how much memory we intend to use
- memory_ratio %% how much more memory we can use
+-export([register/1, report_queue_duration/2]).
+
+-record(state, {timer, %% 'internal_update' timer
+ queue_durations, %% ets, (qpid, last_reported, last_sent)
+ queue_duration_sum, %% sum of all queue_durations
+ queue_duration_count, %% number of elements in sum
+ memory_limit, %% how much memory we intend to use
+ memory_ratio, %% how much more memory we can use
+ desired_duration %% the desired queue duration
}).
-define(SERVER, ?MODULE).
--define(DEFAULT_UPDATE_INTERVAL_MS, 2500).
+-define(DEFAULT_UPDATE_INTERVAL, 2500).
-define(TABLE_NAME, ?MODULE).
-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day
@@ -106,11 +107,12 @@
queue_duration_sum :: float(),
queue_duration_count:: non_neg_integer(),
memory_limit :: pos_integer(),
- memory_ratio :: float() }).
+ memory_ratio :: float(),
+ desired_duration :: float() | 'infinity' }).
-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
-spec(register/1 :: (pid()) -> 'ok').
--spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
+-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
-spec(init/1 :: ([]) -> {'ok', state()}).
@@ -130,9 +132,9 @@ update() ->
register(Pid) ->
gen_server2:cast(?SERVER, {register, Pid}).
-push_queue_duration(Pid, QueueDuration) ->
+report_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
- {push_queue_duration, Pid, QueueDuration}).
+ {report_queue_duration, Pid, QueueDuration}).
%%----------------------------------------------------------------------------
@@ -148,33 +150,21 @@ init([]) ->
%% try to remain safe distance from real throttle limit.
MemoryLimit = trunc(get_memory_limit() * 0.6),
- {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
{ok, #state{timer = TRef,
queue_durations = ets:new(?TABLE_NAME, [set, private]),
queue_duration_sum = 0.0,
queue_duration_count = 0,
memory_limit = MemoryLimit,
- memory_ratio = 1.0}}.
+ memory_ratio = 1.0,
+ desired_duration = infinity}}.
-get_avg_duration(#state{queue_duration_sum = Sum,
- queue_duration_count = Count}) ->
- case Count of
- 0 -> infinity;
- _ -> Sum / Count
- end.
-
-get_desired_duration(State = #state{memory_ratio = Ratio}) ->
- case get_avg_duration(State) of
- infinity -> infinity;
- AvgQueueDuration -> AvgQueueDuration * Ratio
- end.
-
-handle_call({push_queue_duration, Pid, QueueDuration}, From,
+handle_call({report_queue_duration, Pid, QueueDuration}, From,
State = #state{queue_duration_sum = Sum,
queue_duration_count = Count,
- queue_durations = Durations}) ->
- SendDuration = get_desired_duration(State),
+ queue_durations = Durations,
+ desired_duration = SendDuration}) ->
gen_server2:reply(From, SendDuration),
QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of
@@ -237,12 +227,22 @@ set_queue_duration(Pid, QueueDuration) ->
gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}).
internal_update(State = #state{memory_limit = Limit,
- queue_durations = Durations}) ->
- DesiredDurationAvg = get_desired_duration(State),
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count}) ->
%% available memory / used memory
MemoryRatio = Limit / erlang:memory(total),
- State1 = State#state{memory_ratio = MemoryRatio},
- DesiredDurationAvg1 = get_desired_duration(State1),
+ AvgDuration = case Count of
+ 0 -> infinity;
+ _ -> Sum / Count
+ end,
+ DesiredDurationAvg1 = case AvgDuration of
+ infinity -> infinity;
+ AvgQueueDuration -> AvgQueueDuration * MemoryRatio
+ end,
+ State1 = State#state{memory_ratio = MemoryRatio,
+ desired_duration = DesiredDurationAvg1},
%% only inform queues immediately if the desired duration has
%% decreased
@@ -267,4 +267,3 @@ internal_update(State = #state{memory_limit = Limit,
false -> ok
end,
State1.
-