diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-11 15:16:20 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-11 15:16:20 +0000 |
| commit | e942644287d7e96a3f5d455b98bc6857fb747124 (patch) | |
| tree | a4a53bdf530d7b4cd145f0b5b3450a37768ee917 | |
| parent | 862159f445f21fc128c5e93e746859b3c961ea8c (diff) | |
| download | rabbitmq-server-git-e942644287d7e96a3f5d455b98bc6857fb747124.tar.gz | |
Unhappy with the idea that the desired duration is affected immediately by new queues and queues dying, but the memory ratio, which reflects the amount of memory erlang has used, is updated periodically. This mix of up-to-date and stale information in the calculation of the desired duration alarms me. Thus store the desired duration in the state, and always report that. That is then update periodically, thus is only ever calculated using current values.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 71 |
2 files changed, 37 insertions, 38 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 9b97fe8651..0bfa6df160 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -822,8 +822,8 @@ handle_cast(send_memory_monitor_update, State) -> DrainRatio1 = update_ratio(State#q.drain_ratio, State#q.next_msg_id), MsgSec = DrainRatio1#ratio.ratio * 1000000, % msg/sec QueueDuration = queue:len(State#q.message_buffer) / MsgSec, % seconds - DesiredQueueDuration = rabbit_memory_monitor:push_queue_duration( - self(), QueueDuration), + DesiredQueueDuration = rabbit_memory_monitor:report_queue_duration( + self(), QueueDuration), ?LOGDEBUG("~p Queue duration current/desired ~p/~p~n", [(State#q.q)#amqqueue.name, QueueDuration, DesiredQueueDuration]), noreply(State#q{drain_ratio = DrainRatio1}); diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 4880b260d8..7bd03c9c42 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -81,18 +81,19 @@ -export([update/0]). --export([register/1, push_queue_duration/2]). - --record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, last_reported, last_sent) - queue_duration_sum, %% sum of all queue_durations - queue_duration_count,%% number of elements in sum - memory_limit, %% how much memory we intend to use - memory_ratio %% how much more memory we can use +-export([register/1, report_queue_duration/2]). + +-record(state, {timer, %% 'internal_update' timer + queue_durations, %% ets, (qpid, last_reported, last_sent) + queue_duration_sum, %% sum of all queue_durations + queue_duration_count, %% number of elements in sum + memory_limit, %% how much memory we intend to use + memory_ratio, %% how much more memory we can use + desired_duration %% the desired queue duration }). -define(SERVER, ?MODULE). --define(DEFAULT_UPDATE_INTERVAL_MS, 2500). +-define(DEFAULT_UPDATE_INTERVAL, 2500). -define(TABLE_NAME, ?MODULE). -define(MAX_QUEUE_DURATION, 60*60*24). % 1 day @@ -106,11 +107,12 @@ queue_duration_sum :: float(), queue_duration_count:: non_neg_integer(), memory_limit :: pos_integer(), - memory_ratio :: float() }). + memory_ratio :: float(), + desired_duration :: float() | 'infinity' }). -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). -spec(register/1 :: (pid()) -> 'ok'). --spec(push_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). +-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). -spec(init/1 :: ([]) -> {'ok', state()}). @@ -130,9 +132,9 @@ update() -> register(Pid) -> gen_server2:cast(?SERVER, {register, Pid}). -push_queue_duration(Pid, QueueDuration) -> +report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {push_queue_duration, Pid, QueueDuration}). + {report_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- @@ -148,33 +150,21 @@ init([]) -> %% try to remain safe distance from real throttle limit. MemoryLimit = trunc(get_memory_limit() * 0.6), - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL, ?SERVER, update, []), {ok, #state{timer = TRef, queue_durations = ets:new(?TABLE_NAME, [set, private]), queue_duration_sum = 0.0, queue_duration_count = 0, memory_limit = MemoryLimit, - memory_ratio = 1.0}}. + memory_ratio = 1.0, + desired_duration = infinity}}. -get_avg_duration(#state{queue_duration_sum = Sum, - queue_duration_count = Count}) -> - case Count of - 0 -> infinity; - _ -> Sum / Count - end. - -get_desired_duration(State = #state{memory_ratio = Ratio}) -> - case get_avg_duration(State) of - infinity -> infinity; - AvgQueueDuration -> AvgQueueDuration * Ratio - end. - -handle_call({push_queue_duration, Pid, QueueDuration}, From, +handle_call({report_queue_duration, Pid, QueueDuration}, From, State = #state{queue_duration_sum = Sum, queue_duration_count = Count, - queue_durations = Durations}) -> - SendDuration = get_desired_duration(State), + queue_durations = Durations, + desired_duration = SendDuration}) -> gen_server2:reply(From, SendDuration), QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of @@ -237,12 +227,22 @@ set_queue_duration(Pid, QueueDuration) -> gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). internal_update(State = #state{memory_limit = Limit, - queue_durations = Durations}) -> - DesiredDurationAvg = get_desired_duration(State), + queue_durations = Durations, + desired_duration = DesiredDurationAvg, + queue_duration_sum = Sum, + queue_duration_count = Count}) -> %% available memory / used memory MemoryRatio = Limit / erlang:memory(total), - State1 = State#state{memory_ratio = MemoryRatio}, - DesiredDurationAvg1 = get_desired_duration(State1), + AvgDuration = case Count of + 0 -> infinity; + _ -> Sum / Count + end, + DesiredDurationAvg1 = case AvgDuration of + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * MemoryRatio + end, + State1 = State#state{memory_ratio = MemoryRatio, + desired_duration = DesiredDurationAvg1}, %% only inform queues immediately if the desired duration has %% decreased @@ -267,4 +267,3 @@ internal_update(State = #state{memory_limit = Limit, false -> ok end, State1. - |
