summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-11 15:02:08 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-11 15:02:08 +0000
commit862159f445f21fc128c5e93e746859b3c961ea8c (patch)
tree40a6f30273c10b760a43bc3cdf8059aebc98f22c
parentafcb6ae973b521f682d4e41ef64d59ca9abc41ad (diff)
downloadrabbitmq-server-git-862159f445f21fc128c5e93e746859b3c961ea8c.tar.gz
Cosmetics
-rw-r--r--src/rabbit_memory_monitor.erl194
1 files changed, 90 insertions, 104 deletions
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index ff7684bd16..4880b260d8 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -30,7 +30,7 @@
%%
-%% This module handles the node-wide memory statistics.
+%% This module handles the node-wide memory statistics.
%% It receives statistics from all queues, counts the desired
%% queue length (in seconds), and sends this information back to
%% queues.
@@ -46,8 +46,8 @@
%% Monitor X--*-+--X---*-+--X------X----X-----X+----------->
%%
%% Or to put it in words. Queue periodically sends (casts) 'push_queue_duration'
-%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor
-%% _always_ replies with a 'set_queue_duration' cast. This way,
+%% message to the Monitor (cases 1 and 2 on the asciiart above). Monitor
+%% _always_ replies with a 'set_queue_duration' cast. This way,
%% we're pretty sure that the Queue is not hibernated.
%% Monitor periodically recounts numbers ('X' on asciiart). If, during this
%% update we notice that a queue was using too much memory, we send a message
@@ -57,13 +57,13 @@
%%
%% The main job of this module, is to make sure that all the queues have
%% more or less the same number of seconds till become drained.
-%% This average, seconds-till-queue-is-drained, is then multiplied by
+%% This average, seconds-till-queue-is-drained, is then multiplied by
%% the ratio of Total/Used memory. So, if we can 'afford' more memory to be
%% used, we'll report greater number back to the queues. In the out of
%% memory case, we are going to reduce the average drain-seconds.
%% To acheive all this we need to accumulate the information from every
%% queue, and count an average from that.
-%%
+%%
%% real_queue_duration_avg = avg([drain_from_queue_1, queue_2, queue_3, ...])
%% memory_overcommit = allowed_memory / used_memory
%% desired_queue_duration_avg = real_queue_duration_avg * memory_overcommit
@@ -84,9 +84,9 @@
-export([register/1, push_queue_duration/2]).
-record(state, {timer, %% 'internal_update' timer
- queue_durations, %% ets, (qpid, seconds_till_queue_is_empty)
+ queue_durations, %% ets, (qpid, last_reported, last_sent)
queue_duration_sum, %% sum of all queue_durations
- queue_duration_items,%% number of elements in sum
+ queue_duration_count,%% number of elements in sum
memory_limit, %% how much memory we intend to use
memory_ratio %% how much more memory we can use
}).
@@ -94,7 +94,7 @@
-define(SERVER, ?MODULE).
-define(DEFAULT_UPDATE_INTERVAL_MS, 2500).
-define(TABLE_NAME, ?MODULE).
--define(MAX_QUEUE_DURATION_ALLOWED, 60*60*24). % 1 day
+-define(MAX_QUEUE_DURATION, 60*60*24). % 1 day
%% If user disabled vm_memory_monitor, let's assume 1GB of memory we can use.
-define(MEMORY_SIZE_FOR_DISABLED_VMM, 1073741824).
@@ -104,7 +104,7 @@
-type(state() :: #state{timer :: timer:tref(),
queue_durations :: tid(),
queue_duration_sum :: float(),
- queue_duration_items:: non_neg_integer(),
+ queue_duration_count:: non_neg_integer(),
memory_limit :: pos_integer(),
memory_ratio :: float() }).
@@ -114,10 +114,6 @@
-spec(init/1 :: ([]) -> {'ok', state()}).
--ifdef(debug).
--spec(ftoa/1 :: (any()) -> string()).
--endif.
-
-spec(internal_update/1 :: (state()) -> state()).
-endif.
@@ -136,68 +132,67 @@ register(Pid) ->
push_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
- {push_queue_duration, Pid, QueueDuration}).
+ {push_queue_duration, Pid, QueueDuration}).
%%----------------------------------------------------------------------------
get_memory_limit() ->
- RabbitMemoryLimit = case vm_memory_monitor:get_memory_limit() of
+ case vm_memory_monitor:get_memory_limit() of
undefined -> ?MEMORY_SIZE_FOR_DISABLED_VMM;
A -> A
end.
init([]) ->
- %% We should never use more memory than user requested. As the memory
+ %% We should never use more memory than user requested. As the memory
%% manager doesn't really know how much memory queues are using, we shall
%% try to remain safe distance from real throttle limit.
MemoryLimit = trunc(get_memory_limit() * 0.6),
- rabbit_log:warning("Queues go to disk when memory is above: ~pMB~n",
- [erlang:trunc(MemoryLimit/1048576)]),
-
- {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
- ?SERVER, update, []),
- {ok, #state{timer = TRef,
- queue_durations = ets:new(?TABLE_NAME, [set, private]),
- queue_duration_sum = 0.0,
- queue_duration_items = 0,
- memory_limit = MemoryLimit,
- memory_ratio = 1.0}}.
+
+ {ok, TRef} = timer:apply_interval(?DEFAULT_UPDATE_INTERVAL_MS,
+ ?SERVER, update, []),
+ {ok, #state{timer = TRef,
+ queue_durations = ets:new(?TABLE_NAME, [set, private]),
+ queue_duration_sum = 0.0,
+ queue_duration_count = 0,
+ memory_limit = MemoryLimit,
+ memory_ratio = 1.0}}.
get_avg_duration(#state{queue_duration_sum = Sum,
- queue_duration_items = Items}) ->
- case Items of
+ queue_duration_count = Count}) ->
+ case Count of
0 -> infinity;
- _ -> Sum / Items
+ _ -> Sum / Count
end.
-get_desired_duration(State) ->
+get_desired_duration(State = #state{memory_ratio = Ratio}) ->
case get_avg_duration(State) of
- infinity -> infinity;
- AvgQueueDuration -> AvgQueueDuration * State#state.memory_ratio
+ infinity -> infinity;
+ AvgQueueDuration -> AvgQueueDuration * Ratio
end.
-handle_call({push_queue_duration, Pid, QueueDuration0}, From, State) ->
+handle_call({push_queue_duration, Pid, QueueDuration}, From,
+ State = #state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations}) ->
SendDuration = get_desired_duration(State),
gen_server2:reply(From, SendDuration),
- QueueDuration = case QueueDuration0 > ?MAX_QUEUE_DURATION_ALLOWED of
- true -> infinity;
- false -> QueueDuration0
- end,
-
- {Sum, Items} = {State#state.queue_duration_sum,
- State#state.queue_duration_items},
- [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid),
- {Sum1, Items1} =
- case {PrevQueueDuration == infinity, QueueDuration == infinity} of
- {true, true} -> {Sum, Items};
- {true, false} -> {Sum + QueueDuration, Items + 1};
- {false, true} -> {Sum - PrevQueueDuration, Items - 1};
- {false, false} -> {Sum - PrevQueueDuration + QueueDuration, Items}
- end,
- ets:insert(State#state.queue_durations, {Pid, QueueDuration, SendDuration}),
+ QueueDuration1 = case QueueDuration > ?MAX_QUEUE_DURATION of
+ true -> infinity;
+ false -> QueueDuration
+ end,
+
+ [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
+ {Sum1, Count1} =
+ case {PrevQueueDuration, QueueDuration1} of
+ {infinity, infinity} -> {Sum, Count};
+ {infinity, _} -> {Sum + QueueDuration1, Count + 1};
+ {_, infinity} -> {Sum - PrevQueueDuration, Count - 1};
+ {_, _} -> {Sum - PrevQueueDuration + QueueDuration1, Count}
+ end,
+ true = ets:insert(Durations, {Pid, QueueDuration1, SendDuration}),
{noreply, State#state{queue_duration_sum = Sum1,
- queue_duration_items = Items1}};
+ queue_duration_count = Count1}};
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -208,77 +203,68 @@ handle_cast(update, State) ->
handle_cast({register, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
- ets:insert(State#state.queue_durations, {Pid, infinity, infinity}),
+ true = ets:insert(State#state.queue_durations, {Pid, infinity, infinity}),
{noreply, State};
handle_cast(_Request, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
- {Sum, Items} = {State#state.queue_duration_sum,
- State#state.queue_duration_items},
- [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(State#state.queue_durations, Pid),
- Sum1 = case PrevQueueDuration == infinity of
- true -> Sum;
- false -> Sum - PrevQueueDuration
- end,
- ets:delete(State#state.queue_durations, Pid),
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #state{queue_duration_sum = Sum,
+ queue_duration_count = Count,
+ queue_durations = Durations}) ->
+ [{_Pid, PrevQueueDuration, _PrevSendDuration}] = ets:lookup(Durations, Pid),
+ Sum1 = case PrevQueueDuration of
+ infinity -> Sum;
+ _ -> Sum - PrevQueueDuration
+ end,
+ true = ets:delete(State#state.queue_durations, Pid),
{noreply, State#state{queue_duration_sum = Sum1,
- queue_duration_items = Items-1}};
+ queue_duration_count = Count-1}};
-handle_info(_Info, State) ->
+handle_info(_Info, State) ->
{noreply, State}.
-terminate(_Reason, _State) ->
+terminate(_Reason, _State) ->
ok.
-code_change(_OldVsn, State, _Extra) ->
+code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-
set_queue_duration(Pid, QueueDuration) ->
gen_server2:pcast(Pid, 7, {set_queue_duration, QueueDuration}).
--ifdef(debug).
-ftoa(Float) ->
- Str = case is_float(Float) of
- true -> io_lib:format("~11.3f",[Float]);
- false -> io_lib:format("~p", [Float])
- end,
- lists:flatten(Str).
--endif.
-
-
-%% Update memory ratio. Count new DesiredQueueDuration.
-%% Get queues that are using more than that, and send
-%% pessimistic information back to them.
-internal_update(State0) ->
- %% available memory / used memory
- MemoryRatio = State0#state.memory_limit / erlang:memory(total),
- State = State0#state{memory_ratio = MemoryRatio},
-
+internal_update(State = #state{memory_limit = Limit,
+ queue_durations = Durations}) ->
DesiredDurationAvg = get_desired_duration(State),
-
- ?LOGDEBUG("Avg duration: real/desired:~s/~s Memory ratio:~s Queues:~p~n",
- [ftoa(get_avg_duration(State)), ftoa(DesiredDurationAvg),
- ftoa(MemoryRatio),
- ets:foldl(fun (_, Acc) -> Acc+1 end,
- 0, State#state.queue_durations)] ),
-
- %% If we have pessimistic information, we need to inform queues
- %% to reduce it's memory usage when needed.
- %% This sometimes wakes up queues from hibernation. Well, we don't care.
- PromptReduceDuraton = fun ({Pid, QueueDuration, PrevSendDuration}, Acc) ->
- case (PrevSendDuration > DesiredDurationAvg) and (QueueDuration > DesiredDurationAvg) of
- true -> set_queue_duration(Pid, DesiredDurationAvg),
- ets:insert(State#state.queue_durations, {Pid, QueueDuration, DesiredDurationAvg}),
- Acc + 1;
- _ -> Acc
- end
+ %% available memory / used memory
+ MemoryRatio = Limit / erlang:memory(total),
+ State1 = State#state{memory_ratio = MemoryRatio},
+ DesiredDurationAvg1 = get_desired_duration(State1),
+
+ %% only inform queues immediately if the desired duration has
+ %% decreased
+ case DesiredDurationAvg1 < DesiredDurationAvg of
+ true ->
+ %% If we have pessimistic information, we need to inform
+ %% queues to reduce it's memory usage when needed. This
+ %% sometimes wakes up queues from hibernation.
+ true = ets:foldl(
+ fun ({Pid, QueueDuration, PrevSendDuration}, true) ->
+ case DesiredDurationAvg1 <
+ lists:min([PrevSendDuration, QueueDuration]) of
+ true ->
+ set_queue_duration(Pid,
+ DesiredDurationAvg1),
+ ets:insert(Durations,
+ {Pid, QueueDuration,
+ DesiredDurationAvg1});
+ _ -> true
+ end
+ end, true, Durations);
+ false -> ok
end,
- ets:foldl(PromptReduceDuraton, 0, State#state.queue_durations),
- State.
-
+ State1.