summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-28 12:31:33 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-28 12:31:33 +0100
commit676fcdb73331c1e55ac3c446685aaac3c5ddf676 (patch)
tree8aba92e2b1c9adbf3379fee65904a1151e68328c /src
parent7a5b7c2cf7e7acc41ce464486cfbdc24ba08a5d0 (diff)
downloadrabbitmq-server-git-676fcdb73331c1e55ac3c446685aaac3c5ddf676.tar.gz
Changes to memory_manager. Watch for change in overall amount of free tokens, not requested amount
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_memory_manager.erl82
1 files changed, 44 insertions, 38 deletions
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl
index 29216d77d2..b7640d7a45 100644
--- a/src/rabbit_memory_manager.erl
+++ b/src/rabbit_memory_manager.erl
@@ -41,6 +41,7 @@
-export([register/5, report_memory/3, info/0, conserve_memory/2]).
-define(TOTAL_TOKENS, 10000000).
+-define(THRESHOLD_MULTIPLIER, 1.05).
-define(SERVER, ?MODULE).
@@ -197,7 +198,7 @@ handle_cast({report_memory, Pid, Memory, Hibernating},
end,
{StateN = #state { hibernate = Sleepy }, ActivityNew} =
case find_process(Pid, Procs) of
- {libre, {OAlloc, _OActivity}} ->
+ {libre, OAlloc, _OActivity} ->
Avail1 = Avail + OAlloc,
State1 = #state { available_tokens = Avail2,
processes = Procs1 }
@@ -205,20 +206,22 @@ handle_cast({report_memory, Pid, Memory, Hibernating},
State #state { available_tokens = Avail1 }),
case Req > Avail2 of
true -> %% nowt we can do, oppress the process
- ok = set_process_mode(Callbacks, Pid, oppressed),
- {State1 #state { processes =
- dict:store(Pid, {Req, oppressed},
- Procs1) }, oppressed};
+ Procs2 =
+ set_process_mode(Procs1, Callbacks, Pid, oppressed,
+ {oppressed, Avail2}),
+ {State1 #state { processes = Procs2 }, oppressed};
false -> %% keep liberated
{State1 #state
{ processes =
- dict:store(Pid, {Req, LibreActivity}, Procs1),
+ dict:store(Pid, {libre, Req, LibreActivity}, Procs1),
available_tokens = Avail2 - Req },
LibreActivity}
end;
- {oppressed, OrigReq} ->
+ {oppressed, OrigAvail} ->
case Alarmed orelse Hibernating orelse
- (Req > OrigReq * 0.95 andalso Req < OrigReq * 1.05) of
+ (Avail > (OrigAvail / ?THRESHOLD_MULTIPLIER) andalso
+ Avail < (OrigAvail * ?THRESHOLD_MULTIPLIER))
+ of
true ->
{State, oppressed};
false ->
@@ -230,11 +233,11 @@ handle_cast({report_memory, Pid, Memory, Hibernating},
%% not enough space, so stay oppressed
{State1, oppressed};
false -> %% can liberate the process
- set_process_mode(Callbacks, Pid, liberated),
+ Procs2 = set_process_mode(
+ Procs1, Callbacks, Pid, liberated,
+ {libre, Req, LibreActivity}),
{State1 #state {
- processes =
- dict:store(Pid, {Req, LibreActivity},
- Procs1),
+ processes = Procs2,
available_tokens = Avail1 - Req },
LibreActivity}
end
@@ -254,9 +257,9 @@ handle_cast({register, Pid, IsUnoppressable, Module, Function, Args},
unoppressable = Unoppressable }) ->
_MRef = erlang:monitor(process, Pid),
Unoppressable1 = case IsUnoppressable of
- true -> sets:add_element(Pid, Unoppressable);
- false -> Unoppressable
- end,
+ true -> sets:add_element(Pid, Unoppressable);
+ false -> Unoppressable
+ end,
{noreply, State #state { callbacks = dict:store
(Pid, {Module, Function, Args}, Callbacks),
unoppressable = Unoppressable1
@@ -267,12 +270,14 @@ handle_cast({conserve_memory, Conserve}, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
- processes = Procs }) ->
- State1 = State #state { processes = dict:erase(Pid, Procs) },
+ processes = Procs,
+ callbacks = Callbacks }) ->
+ State1 = State #state { processes = dict:erase(Pid, Procs),
+ callbacks = dict:erase(Pid, Callbacks) },
{noreply, case find_process(Pid, Procs) of
{oppressed, _OrigReq} ->
State1;
- {libre, {Alloc, _Activity}} ->
+ {libre, Alloc, _Activity} ->
State1 #state { available_tokens = Avail + Alloc }
end};
handle_info({'EXIT', _Pid, Reason}, State) ->
@@ -288,14 +293,14 @@ code_change(_OldVsn, State, _Extra) ->
find_process(Pid, Procs) ->
case dict:find(Pid, Procs) of
- {ok, {OrigReq, oppressed}} -> {oppressed, OrigReq};
- {ok, Value = {_Alloc, _Activity}} -> {libre, Value};
- error -> {oppressed, -9999}
+ {ok, Value} -> Value;
+ error -> {oppressed, 0}
end.
-set_process_mode(Callbacks, Pid, Mode) ->
+set_process_mode(Procs, Callbacks, Pid, Mode, Record) ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
- erlang:apply(Module, Function, Args ++ [Mode]).
+ ok = erlang:apply(Module, Function, Args ++ [Mode]),
+ dict:store(Pid, Record, Procs).
tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) ->
tidy_and_sum(hibernate, Procs, fun queue:out/1,
@@ -313,7 +318,7 @@ tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet,
{DupCheckSet, AnaInit, AllocAcc};
false ->
case find_process(Pid, Procs) of
- {libre, {Alloc, AtomExpected}} ->
+ {libre, Alloc, AtomExpected} ->
{sets:add_element(Pid, DupCheckSet),
Anamorphism(Pid, Alloc, AnaInit),
Alloc + AllocAcc};
@@ -325,7 +330,7 @@ tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet,
DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
end.
-free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) ->
+free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req, Avail) ->
free_from(Callbacks,
fun(Procs1, Sleepy1, SleepyAcc) ->
case queue:out(Sleepy1) of
@@ -335,35 +340,36 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) ->
case sets:is_element(Pid, IgnorePids) of
true -> {skip, Sleepy2,
queue:in(Pid, SleepyAcc)};
- false -> {Alloc, hibernate} =
+ false -> {libre, Alloc, hibernate} =
dict:fetch(Pid, Procs1),
{value, Sleepy2, Pid, Alloc}
end
end
- end, fun queue:join/2, Procs, Sleepy, queue:new(), Req).
+ end, fun queue:join/2, Procs, Sleepy, queue:new(), Req, Avail).
-free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req) ->
+free_from(
+ Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req, Avail) ->
case Hylomorphism(Procs, CataInit, AnaInit) of
empty ->
{AnaInit, Procs, Req};
{skip, CataInit1, AnaInit1} ->
free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit1,
- AnaInit1, Req);
+ AnaInit1, Req, Avail);
{value, CataInit1, Pid, Alloc} ->
- Procs1 = dict:store(Pid, {Alloc, oppressed}, Procs),
- ok = set_process_mode(Callbacks, Pid, oppressed),
+ Procs1 = set_process_mode(
+ Procs, Callbacks, Pid, oppressed, {oppressed, Avail}),
case Req > Alloc of
true -> free_from(Callbacks, Hylomorphism, BaseCase, Procs1,
- CataInit1, AnaInit, Req - Alloc);
+ CataInit1, AnaInit, Req - Alloc, Avail);
false -> {BaseCase(CataInit1, AnaInit), Procs1, Req - Alloc}
end
end.
-free_upto(Pid, Req, State = #state { available_tokens = Avail,
- processes = Procs,
- callbacks = Callbacks,
- hibernate = Sleepy,
- unoppressable = Unoppressable })
+free_upto(Pid, Req, State = #state { available_tokens = Avail,
+ processes = Procs,
+ callbacks = Callbacks,
+ hibernate = Sleepy,
+ unoppressable = Unoppressable })
when Req > Avail ->
Unoppressable1 = sets:add_element(Pid, Unoppressable),
{Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs),
@@ -376,7 +382,7 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail,
%% freed
{Sleepy2, Procs1, ReqRem} =
free_upto_sleepy(Unoppressable1, Callbacks,
- Sleepy1, Procs, Req),
+ Sleepy1, Procs, Req, Avail),
State #state { available_tokens = Avail + (Req - ReqRem),
processes = Procs1,
hibernate = Sleepy2 }