diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-28 12:31:33 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-28 12:31:33 +0100 |
| commit | 676fcdb73331c1e55ac3c446685aaac3c5ddf676 (patch) | |
| tree | 8aba92e2b1c9adbf3379fee65904a1151e68328c /src | |
| parent | 7a5b7c2cf7e7acc41ce464486cfbdc24ba08a5d0 (diff) | |
| download | rabbitmq-server-git-676fcdb73331c1e55ac3c446685aaac3c5ddf676.tar.gz | |
Changes to memory_manager. Watch for change in overall amount of free tokens, not requested amount
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_memory_manager.erl | 82 |
1 files changed, 44 insertions, 38 deletions
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl index 29216d77d2..b7640d7a45 100644 --- a/src/rabbit_memory_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -41,6 +41,7 @@ -export([register/5, report_memory/3, info/0, conserve_memory/2]). -define(TOTAL_TOKENS, 10000000). +-define(THRESHOLD_MULTIPLIER, 1.05). -define(SERVER, ?MODULE). @@ -197,7 +198,7 @@ handle_cast({report_memory, Pid, Memory, Hibernating}, end, {StateN = #state { hibernate = Sleepy }, ActivityNew} = case find_process(Pid, Procs) of - {libre, {OAlloc, _OActivity}} -> + {libre, OAlloc, _OActivity} -> Avail1 = Avail + OAlloc, State1 = #state { available_tokens = Avail2, processes = Procs1 } @@ -205,20 +206,22 @@ handle_cast({report_memory, Pid, Memory, Hibernating}, State #state { available_tokens = Avail1 }), case Req > Avail2 of true -> %% nowt we can do, oppress the process - ok = set_process_mode(Callbacks, Pid, oppressed), - {State1 #state { processes = - dict:store(Pid, {Req, oppressed}, - Procs1) }, oppressed}; + Procs2 = + set_process_mode(Procs1, Callbacks, Pid, oppressed, + {oppressed, Avail2}), + {State1 #state { processes = Procs2 }, oppressed}; false -> %% keep liberated {State1 #state { processes = - dict:store(Pid, {Req, LibreActivity}, Procs1), + dict:store(Pid, {libre, Req, LibreActivity}, Procs1), available_tokens = Avail2 - Req }, LibreActivity} end; - {oppressed, OrigReq} -> + {oppressed, OrigAvail} -> case Alarmed orelse Hibernating orelse - (Req > OrigReq * 0.95 andalso Req < OrigReq * 1.05) of + (Avail > (OrigAvail / ?THRESHOLD_MULTIPLIER) andalso + Avail < (OrigAvail * ?THRESHOLD_MULTIPLIER)) + of true -> {State, oppressed}; false -> @@ -230,11 +233,11 @@ handle_cast({report_memory, Pid, Memory, Hibernating}, %% not enough space, so stay oppressed {State1, oppressed}; false -> %% can liberate the process - set_process_mode(Callbacks, Pid, liberated), + Procs2 = set_process_mode( + Procs1, Callbacks, Pid, liberated, + {libre, Req, LibreActivity}), {State1 #state { - processes = - dict:store(Pid, {Req, LibreActivity}, - Procs1), + processes = Procs2, available_tokens = Avail1 - Req }, LibreActivity} end @@ -254,9 +257,9 @@ handle_cast({register, Pid, IsUnoppressable, Module, Function, Args}, unoppressable = Unoppressable }) -> _MRef = erlang:monitor(process, Pid), Unoppressable1 = case IsUnoppressable of - true -> sets:add_element(Pid, Unoppressable); - false -> Unoppressable - end, + true -> sets:add_element(Pid, Unoppressable); + false -> Unoppressable + end, {noreply, State #state { callbacks = dict:store (Pid, {Module, Function, Args}, Callbacks), unoppressable = Unoppressable1 @@ -267,12 +270,14 @@ handle_cast({conserve_memory, Conserve}, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, - processes = Procs }) -> - State1 = State #state { processes = dict:erase(Pid, Procs) }, + processes = Procs, + callbacks = Callbacks }) -> + State1 = State #state { processes = dict:erase(Pid, Procs), + callbacks = dict:erase(Pid, Callbacks) }, {noreply, case find_process(Pid, Procs) of {oppressed, _OrigReq} -> State1; - {libre, {Alloc, _Activity}} -> + {libre, Alloc, _Activity} -> State1 #state { available_tokens = Avail + Alloc } end}; handle_info({'EXIT', _Pid, Reason}, State) -> @@ -288,14 +293,14 @@ code_change(_OldVsn, State, _Extra) -> find_process(Pid, Procs) -> case dict:find(Pid, Procs) of - {ok, {OrigReq, oppressed}} -> {oppressed, OrigReq}; - {ok, Value = {_Alloc, _Activity}} -> {libre, Value}; - error -> {oppressed, -9999} + {ok, Value} -> Value; + error -> {oppressed, 0} end. -set_process_mode(Callbacks, Pid, Mode) -> +set_process_mode(Procs, Callbacks, Pid, Mode, Record) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), - erlang:apply(Module, Function, Args ++ [Mode]). + ok = erlang:apply(Module, Function, Args ++ [Mode]), + dict:store(Pid, Record, Procs). tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) -> tidy_and_sum(hibernate, Procs, fun queue:out/1, @@ -313,7 +318,7 @@ tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet, {DupCheckSet, AnaInit, AllocAcc}; false -> case find_process(Pid, Procs) of - {libre, {Alloc, AtomExpected}} -> + {libre, Alloc, AtomExpected} -> {sets:add_element(Pid, DupCheckSet), Anamorphism(Pid, Alloc, AnaInit), Alloc + AllocAcc}; @@ -325,7 +330,7 @@ tidy_and_sum(AtomExpected, Procs, Catamorphism, Anamorphism, DupCheckSet, DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) -> +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req, Avail) -> free_from(Callbacks, fun(Procs1, Sleepy1, SleepyAcc) -> case queue:out(Sleepy1) of @@ -335,35 +340,36 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req) -> case sets:is_element(Pid, IgnorePids) of true -> {skip, Sleepy2, queue:in(Pid, SleepyAcc)}; - false -> {Alloc, hibernate} = + false -> {libre, Alloc, hibernate} = dict:fetch(Pid, Procs1), {value, Sleepy2, Pid, Alloc} end end - end, fun queue:join/2, Procs, Sleepy, queue:new(), Req). + end, fun queue:join/2, Procs, Sleepy, queue:new(), Req, Avail). -free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req) -> +free_from( + Callbacks, Hylomorphism, BaseCase, Procs, CataInit, AnaInit, Req, Avail) -> case Hylomorphism(Procs, CataInit, AnaInit) of empty -> {AnaInit, Procs, Req}; {skip, CataInit1, AnaInit1} -> free_from(Callbacks, Hylomorphism, BaseCase, Procs, CataInit1, - AnaInit1, Req); + AnaInit1, Req, Avail); {value, CataInit1, Pid, Alloc} -> - Procs1 = dict:store(Pid, {Alloc, oppressed}, Procs), - ok = set_process_mode(Callbacks, Pid, oppressed), + Procs1 = set_process_mode( + Procs, Callbacks, Pid, oppressed, {oppressed, Avail}), case Req > Alloc of true -> free_from(Callbacks, Hylomorphism, BaseCase, Procs1, - CataInit1, AnaInit, Req - Alloc); + CataInit1, AnaInit, Req - Alloc, Avail); false -> {BaseCase(CataInit1, AnaInit), Procs1, Req - Alloc} end end. -free_upto(Pid, Req, State = #state { available_tokens = Avail, - processes = Procs, - callbacks = Callbacks, - hibernate = Sleepy, - unoppressable = Unoppressable }) +free_upto(Pid, Req, State = #state { available_tokens = Avail, + processes = Procs, + callbacks = Callbacks, + hibernate = Sleepy, + unoppressable = Unoppressable }) when Req > Avail -> Unoppressable1 = sets:add_element(Pid, Unoppressable), {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs), @@ -376,7 +382,7 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail, %% freed {Sleepy2, Procs1, ReqRem} = free_upto_sleepy(Unoppressable1, Callbacks, - Sleepy1, Procs, Req), + Sleepy1, Procs, Req, Avail), State #state { available_tokens = Avail + (Req - ReqRem), processes = Procs1, hibernate = Sleepy2 } |
