summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl4
-rw-r--r--src/rabbit_memory_monitor.erl71
2 files changed, 37 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 9b97fe8651..0bfa6df160 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -822,8 +822,8 @@ handle_cast(send_memory_monitor_update, State) ->
DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id),
MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec
QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds
- DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration(
- self(), QueueDuration),
+ DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration(
+ self(), QueueDuration),
?LOGDEBUG("~p Queue duration current/desired ~p/~p~n",
[(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]),
noreply(State#q{drain_ratio = DrainRatio1});
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index 4880b260d8..7bd03c9c42 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -81,18 +81,19 @@
-export([update/0]).
--export([register/1, push_queue_duration/2]).
-
--record(state, {timer, %% 'internal_update' timer
- queue_durations, %% ets, (qpid, last_reported, last_sent)
- queue_duration_sum, %% sum of all queue_durations
- queue_duration_count,%% number of elements in sum
- memory_limit, %% how much memory we intend to use
- memory_ratio %% how much more memory we can use
+-export([register/1, report_queue_duration/2]).
+
+-record(state, {timer, %% 'internal_update' timer
+ queue_durations, %% ets, (qpid, last_reported, last_sent)
+ queue_duration_sum, %% sum of all queue_durations
+ queue_duration_count, %% number of elements in sum
+ memory_limit, %% how much memory we intend to use
+ memory_ratio, %% how much more memory we can use
+ desired_duration %% the desired queue duration
}).
-define(SERVER, ?MODULE).
--define(DEFAULT_UPDATE_INTERVAL_MS, 2500).
+-define(DEFAULT_UPDATE_INTERVAL, 2500).
-define(TABLE_NAME, ?MODULE).
-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day
@@ -106,11 +107,12 @@
queue_duration_sum :: float(),
queue_duration_count:: non_neg_integer(),
memory_limit :: pos_integer(),
- memory_ratio :: float() }).
+ memory_ratio :: float(),
+ desired_duration :: float() | 'infinity' }).
-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
-spec(register/1 :: (pid()) -> 'ok').
--spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
+-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
-spec(init/1 :: ([]) -> {'ok', state()}).
@@ -130,9 +132,9 @@ update() ->
register(Pid) ->
gen_server2:cast(?SERVER, {register, Pid}).
-push_queue_duration(Pid, QueueDuration) ->
+report_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
- {push_queue_duration, Pid, QueueDuration}).
+ {report_queue_duration, Pid, QueueDuration}).
%%----------------------------------------------------------------------------
@@ -148,33 +150,21 @@ init([]) ->
%% try to remain safe distance from real throttle limit.
MemoryLimit = trunc(get_memory_limit() * 0.6),
- {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL,
?SERVER, update, []),
{ok, #state{timer = TRef,
queue_durations = ets:new(?TABLE_NAME, [set, private]),
queue_duration_sum = 0.0,
queue_duration_count = 0,
memory_limit = MemoryLimit,
- memory_ratio = 1.0}}.
+ memory_ratio = 1.0,
+ desired_duration = infinity}}.
-get_avg_duration(#state{queue_duration_sum = Sum,
- queue_duration_count = Count}) ->
- case Count of
- 0 -> infinity;
- _ -> Sum / Count
- end.
-
-get_desired_duration(State = #state{memory_ratio = Ratio}) ->
- case get_avg_duration(State) of
- infinity -> infinity;
- AvgQueueDuration -> AvgQueueDuration * Ratio
- end.
-
-handle_call({push_queue_duration, Pid, QueueDuration}, From,
+handle_call({report_queue_duration, Pid, QueueDuration}, From,
State = #state{queue_duration_sum = Sum,
queue_duration_count = Count,
- queue_durations = Durations}) ->
- SendDuration = get_desired_duration(State),
+ queue_durations = Durations,
+ desired_duration = SendDuration}) ->
gen_server2:reply(From, SendDuration),
QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of
@@ -237,12 +227,22 @@ set_queue_duration(Pid, QueueDuration) ->
gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}).
internal_update(State = #state{memory_limit = Limit,
- queue_durations = Durations}) ->
- DesiredDurationAvg = get_desired_duration(State),
+ queue_durations = Durations,
+ desired_duration = DesiredDurationAvg,
+ queue_duration_sum = Sum,
+ queue_duration_count = Count}) ->
%% available memory / used memory
MemoryRatio = Limit / erlang:memory(total),
- State1 = State#state{memory_ratio = MemoryRatio},
- DesiredDurationAvg1 = get_desired_duration(State1),
+ AvgDuration = case Count of
+ 0 -> infinity;
+ _ -> Sum / Count
+ end,
+ DesiredDurationAvg1 = case AvgDuration of
+ infinity -> infinity;
+ AvgQueueDuration -> AvgQueueDuration * MemoryRatio
+ end,
+ State1 = State#state{memory_ratio = MemoryRatio,
+ desired_duration = DesiredDurationAvg1},
%% only inform queues immediately if the desired duration has
%% decreased
@@ -267,4 +267,3 @@ internal_update(State = #state{memory_limit = Limit,
false -> ok
end,
State1.
-