diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-11 15:02:08 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-11 15:02:08 +0000 |
| commit | 862159f445f21fc128c5e93e746859b3c961ea8c (patch) | |
| tree | 40a6f30273c10b760a43bc3cdf8059aebc98f22c | |
| parent | afcb6ae973b521f682d4e41ef64d59ca9abc41ad (diff) | |
| download | rabbitmq-server-git-862159f445f21fc128c5e93e746859b3c961ea8c.tar.gz | |
Cosmetics
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 194 |
1 files changed, 90 insertions, 104 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index ff7684bd16..4880b260d8 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -30,7 +30,7 @@ %% -%% This module handles the node-wide memory statistics. +%% This module handles the node-wide memory statistics. %% It receives statistics from all queues, counts the desired %% queue length (in seconds), and sends this information back to %% queues. @@ -46,8 +46,8 @@ %% Monitor X--*-+--X---*-+--X------X----X-----X+-----------> %% %% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration' -%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor -%% _always_ replies with a 'set_queue_duration' cast. This way, +%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor +%% _always_ replies with a 'set_queue_duration' cast. This way, %% we're pretty sure that the Queue is not hibernated. %% Monitor periodically recounts numbers ('X' on asciiart). If, during this %% update we notice that a queue was using too much memory, we send a message @@ -57,13 +57,13 @@ %% %% The main job of this module, is to make sure that all the queues have %% more or less the same number of seconds till become drained. -%% This average, seconds-till-queue-is-drained, is then multiplied by +%% This average, seconds-till-queue-is-drained, is then multiplied by %% the ratio of Total/Used memory. So, if we can 'afford' more memory to be %% used, we'll report greater number back to the queues. In the out of %% memory case, we are going to reduce the average drain-seconds. %% To acheive all this we need to accumulate the information from every %% queue, and count an average from that. -%% +%% %% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...]) %% memory_overcommit = allowed_memory / used_memory %% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit @@ -84,9 +84,9 @@ -export([register/1, push_queue_duration/2]). -record(state, {timer, %% 'internal_update' timer - queue_durations, %% ets, (qpid, seconds_till_queue_is_empty) + queue_durations, %% ets, (qpid, last_reported, last_sent) queue_duration_sum, %% sum of all queue_durations - queue_duration_items,%% number of elements in sum + queue_duration_count,%% number of elements in sum memory_limit, %% how much memory we intend to use memory_ratio %% how much more memory we can use }). @@ -94,7 +94,7 @@ -define(SERVER, ?MODULE). -define(DEFAULT_UPDATE_INTERVAL_MS, 2500). -define(TABLE_NAME, ?MODULE). --define(MAX_QUEUE_DURATION_ALLOWED, 60*60*24). % 1 day +-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day %% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use. -define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824). @@ -104,7 +104,7 @@ -type(state() :: #state{timer :: timer:tref(), queue_durations :: tid(), queue_duration_sum :: float(), - queue_duration_items:: non_neg_integer(), + queue_duration_count:: non_neg_integer(), memory_limit :: pos_integer(), memory_ratio :: float() }). @@ -114,10 +114,6 @@ -spec(init/1 :: ([]) -> {'ok', state()}). --ifdef(debug). --spec(ftoa/1 :: (any()) -> string()). --endif. - -spec(internal_update/1 :: (state()) -> state()). -endif. @@ -136,68 +132,67 @@ register(Pid) -> push_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {push_queue_duration, Pid, QueueDuration}). + {push_queue_duration, Pid, QueueDuration}). %%---------------------------------------------------------------------------- get_memory_limit() -> - RabbitMemoryLimit = case vm_memory_monitor:get_memory_limit() of + case vm_memory_monitor:get_memory_limit() of undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM; A -> A end. init([]) -> - %% We should never use more memory than user requested. As the memory + %% We should never use more memory than user requested. As the memory %% manager doesn't really know how much memory queues are using, we shall %% try to remain safe distance from real throttle limit. MemoryLimit = trunc(get_memory_limit() * 0.6), - rabbit_log:warning("Queues go to disk when memory is above: ~pMB~n", - [erlang:trunc(MemoryLimit/1048576)]), - - {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, - ?SERVER, update, []), - {ok, #state{timer = TRef, - queue_durations = ets:new(?TABLE_NAME, [set, private]), - queue_duration_sum = 0.0, - queue_duration_items = 0, - memory_limit = MemoryLimit, - memory_ratio = 1.0}}. + + {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS, + ?SERVER, update, []), + {ok, #state{timer = TRef, + queue_durations = ets:new(?TABLE_NAME, [set, private]), + queue_duration_sum = 0.0, + queue_duration_count = 0, + memory_limit = MemoryLimit, + memory_ratio = 1.0}}. get_avg_duration(#state{queue_duration_sum = Sum, - queue_duration_items = Items}) -> - case Items of + queue_duration_count = Count}) -> + case Count of 0 -> infinity; - _ -> Sum / Items + _ -> Sum / Count end. -get_desired_duration(State) -> +get_desired_duration(State = #state{memory_ratio = Ratio}) -> case get_avg_duration(State) of - infinity -> infinity; - AvgQueueDuration -> AvgQueueDuration * State#state.memory_ratio + infinity -> infinity; + AvgQueueDuration -> AvgQueueDuration * Ratio end. -handle_call({push_queue_duration, Pid, QueueDuration0}, From, State) -> +handle_call({push_queue_duration, Pid, QueueDuration}, From, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations}) -> SendDuration = get_desired_duration(State), gen_server2:reply(From, SendDuration), - QueueDuration = case QueueDuration0 > ?MAX_QUEUE_DURATION_ALLOWED of - true -> infinity; - false -> QueueDuration0 - end, - - {Sum, Items} = {State#state.queue_duration_sum, - State#state.queue_duration_items}, - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), - {Sum1, Items1} = - case {PrevQueueDuration == infinity, QueueDuration == infinity} of - {true, true} -> {Sum, Items}; - {true, false} -> {Sum + QueueDuration, Items + 1}; - {false, true} -> {Sum - PrevQueueDuration, Items - 1}; - {false, false} -> {Sum - PrevQueueDuration + QueueDuration, Items} - end, - ets:insert(State#state.queue_durations, {Pid, QueueDuration, SendDuration}), + QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of + true -> infinity; + false -> QueueDuration + end, + + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + {Sum1, Count1} = + case {PrevQueueDuration, QueueDuration1} of + {infinity, infinity} -> {Sum, Count}; + {infinity, _} -> {Sum + QueueDuration1, Count + 1}; + {_, infinity} -> {Sum - PrevQueueDuration, Count - 1}; + {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count} + end, + true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration}), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_items = Items1}}; + queue_duration_count = Count1}}; handle_call(_Request, _From, State) -> {noreply, State}. @@ -208,77 +203,68 @@ handle_cast(update, State) -> handle_cast({register, Pid}, State) -> _MRef = erlang:monitor(process, Pid), - ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), + true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}), {noreply, State}; handle_cast(_Request, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> - {Sum, Items} = {State#state.queue_duration_sum, - State#state.queue_duration_items}, - [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid), - Sum1 = case PrevQueueDuration == infinity of - true -> Sum; - false -> Sum - PrevQueueDuration - end, - ets:delete(State#state.queue_durations, Pid), +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #state{queue_duration_sum = Sum, + queue_duration_count = Count, + queue_durations = Durations}) -> + [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid), + Sum1 = case PrevQueueDuration of + infinity -> Sum; + _ -> Sum - PrevQueueDuration + end, + true = ets:delete(State#state.queue_durations, Pid), {noreply, State#state{queue_duration_sum = Sum1, - queue_duration_items = Items-1}}; + queue_duration_count = Count-1}}; -handle_info(_Info, State) -> +handle_info(_Info, State) -> {noreply, State}. -terminate(_Reason, _State) -> +terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> +code_change(_OldVsn, State, _Extra) -> {ok, State}. - set_queue_duration(Pid, QueueDuration) -> gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}). --ifdef(debug). -ftoa(Float) -> - Str = case is_float(Float) of - true -> io_lib:format("~11.3f",[Float]); - false -> io_lib:format("~p", [Float]) - end, - lists:flatten(Str). --endif. - - -%% Update memory ratio. Count new DesiredQueueDuration. -%% Get queues that are using more than that, and send -%% pessimistic information back to them. -internal_update(State0) -> - %% available memory / used memory - MemoryRatio = State0#state.memory_limit / erlang:memory(total), - State = State0#state{memory_ratio = MemoryRatio}, - +internal_update(State = #state{memory_limit = Limit, + queue_durations = Durations}) -> DesiredDurationAvg = get_desired_duration(State), - - ?LOGDEBUG("Avg duration: real/desired:~s/~s Memory ratio:~s Queues:~p~n", - [ftoa(get_avg_duration(State)), ftoa(DesiredDurationAvg), - ftoa(MemoryRatio), - ets:foldl(fun (_, Acc) -> Acc+1 end, - 0, State#state.queue_durations)] ), - - %% If we have pessimistic information, we need to inform queues - %% to reduce it's memory usage when needed. - %% This sometimes wakes up queues from hibernation. Well, we don't care. - PromptReduceDuraton = fun ({Pid, QueueDuration, PrevSendDuration}, Acc) -> - case (PrevSendDuration > DesiredDurationAvg) and (QueueDuration > DesiredDurationAvg) of - true -> set_queue_duration(Pid, DesiredDurationAvg), - ets:insert(State#state.queue_durations, {Pid, QueueDuration, DesiredDurationAvg}), - Acc + 1; - _ -> Acc - end + %% available memory / used memory + MemoryRatio = Limit / erlang:memory(total), + State1 = State#state{memory_ratio = MemoryRatio}, + DesiredDurationAvg1 = get_desired_duration(State1), + + %% only inform queues immediately if the desired duration has + %% decreased + case DesiredDurationAvg1 < DesiredDurationAvg of + true -> + %% If we have pessimistic information, we need to inform + %% queues to reduce it's memory usage when needed. This + %% sometimes wakes up queues from hibernation. + true = ets:foldl( + fun ({Pid, QueueDuration, PrevSendDuration}, true) -> + case DesiredDurationAvg1 < + lists:min([PrevSendDuration, QueueDuration]) of + true -> + set_queue_duration(Pid, + DesiredDurationAvg1), + ets:insert(Durations, + {Pid, QueueDuration, + DesiredDurationAvg1}); + _ -> true + end + end, true, Durations); + false -> ok end, - ets:foldl(PromptReduceDuraton, 0, State#state.queue_durations), - State. - + State1. |
