diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 71 |
2 files changed, 37 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b97fe8651..0bfa6df160 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -822,8 +822,8 @@ handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds - DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration( - self(), QueueDuration), + DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( + self(), QueueDuration), ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), noreply(State#q{drain_ratio = DrainRatio1}); diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 4880b260d8..7bd03c9c42 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -81,18 +81,19 @@ -export([update/0]). --export([register/1, push_queue_duration/2]). - --record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, last_reported, last_sent) - queue_duration_sum, %% sum of all queue_durations - queue_duration_count,%% number of elements in sum - memory_limit, %% how much memory we intend to use - memory_ratio %% how much more memory we can use +-export([register/1, report_queue_duration/2]). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets, (qpid, last_reported, last_sent) + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + memory_ratio, %% how much more memory we can use + desired_duration %% the desired queue duration }). -define(SERVER, ?MODULE). --define(DEFAULT_UPDATE_INTERVAL_MS, 2500). +-define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). -define(MAX_QUEUE_DURATION, 60*60*24). % 1 day @@ -106,11 +107,12 @@ queue_duration_sum :: float(), queue_duration_count:: non_neg_integer(), memory_limit :: pos_integer(), - memory_ratio :: float() }). + memory_ratio :: float(), + desired_duration :: float() | 'infinity' }). -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). -spec(register/1 :: (pid()) -> 'ok'). --spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). +-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). -spec(init/1 :: ([]) -> {'ok', state()}). @@ -130,9 +132,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). -push_queue_duration(Pid, QueueDuration) -> +report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {push_queue_duration, Pid, QueueDuration}). + {report_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- @@ -148,33 +150,21 @@ init([]) -> %% try to remain safe distance from real throttle limit. MemoryLimit = trunc(get_memory_limit() * 0.6), - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), {ok, #state{timer = TRef, queue_durations = ets:new(?TABLE_NAME, [set, private]), queue_duration_sum = 0.0, queue_duration_count = 0, memory_limit = MemoryLimit, - memory_ratio = 1.0}}. + memory_ratio = 1.0, + desired_duration = infinity}}. -get_avg_duration(#state{queue_duration_sum = Sum, - queue_duration_count = Count}) -> - case Count of - 0 -> infinity; - _ -> Sum / Count - end. - -get_desired_duration(State = #state{memory_ratio = Ratio}) -> - case get_avg_duration(State) of - infinity -> infinity; - AvgQueueDuration -> AvgQueueDuration * Ratio - end. - -handle_call({push_queue_duration, Pid, QueueDuration}, From, +handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations}) -> - SendDuration = get_desired_duration(State), + queue_durations = Durations, + desired_duration = SendDuration}) -> gen_server2:reply(From, SendDuration), QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of @@ -237,12 +227,22 @@ set_queue_duration(Pid, QueueDuration) -> gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). internal_update(State = #state{memory_limit = Limit, - queue_durations = Durations}) -> - DesiredDurationAvg = get_desired_duration(State), + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), - State1 = State#state{memory_ratio = MemoryRatio}, - DesiredDurationAvg1 = get_desired_duration(State1), + AvgDuration = case Count of + 0 -> infinity; + _ -> Sum / Count + end, + DesiredDurationAvg1 = case AvgDuration of + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * MemoryRatio + end, + State1 = State#state{memory_ratio = MemoryRatio, + desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased @@ -267,4 +267,3 @@ internal_update(State = #state{memory_limit = Limit, false -> ok end, State1. - |
