diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-08-27 18:12:12 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-08-27 18:12:12 +0100 |
| commit | 210835025b0c98ec313cd33731bc29ec6acbe609 (patch) | |
| tree | 04a48ac9bd5af830ccbee2a3fdc959e51f7a851a /src | |
| parent | c6ad55e7e6096ed2d5f2b1043b4dedc33317c432 (diff) | |
| download | rabbitmq-server-git-210835025b0c98ec313cd33731bc29ec6acbe609.tar.gz | |
All QA comments relating to queue_mode_manager
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl (renamed from src/rabbit_queue_mode_manager.erl) | 284 |
5 files changed, 160 insertions, 150 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 665f10a273..8962b12ea6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -140,7 +140,7 @@ start(normal, []) -> {ok, MemoryAlarms} = application:get_env(memory_alarms), ok = rabbit_alarm:start(MemoryAlarms), - ok = start_child(rabbit_queue_mode_manager), + ok = start_child(rabbit_memory_manager), ok = rabbit_binary_generator: check_empty_content_body_frame_size(), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 51b2e8f587..ad0a0f0c68 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -102,7 +102,7 @@ -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). -spec(unblock/2 :: (pid(), pid()) -> 'ok'). --spec(set_storage_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok'). +-spec(set_storage_mode/2 :: (pid(), ('oppressed' | 'liberated')) -> 'ok'). -spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index adf84c0e4c..72325414cd 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -101,7 +101,7 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), - ok = rabbit_queue_mode_manager:register + ok = rabbit_memory_manager:register (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), {ok, MS} = rabbit_mixed_queue:init(QName, Durable), State = #q{q = Q, @@ -551,7 +551,7 @@ i(Item, _) -> report_memory(Hib, State = #q { mixed_state = MS }) -> {MS1, MSize, Gain, Loss} = rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS), - rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib), + rabbit_memory_manager:report_memory(self(), MSize, Gain, Loss, Hib), State #q { mixed_state = MS1 }. %--------------------------------------------------------------------------- @@ -820,7 +820,11 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) -> PendingMessages = lists:flatten([Pending || #tx { pending_messages = Pending} <- all_tx_record()]), - {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS), + Mode1 = case Mode of + liberated -> mixed; + oppressed -> disk + end, + {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode1, PendingMessages, MS), noreply(State #q { mixed_state = MS1 }). handle_info(report_memory, State) -> diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index d9f318e059..18b250c55c 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -281,7 +281,7 @@ -spec(to_ram_disk_mode/0 :: () -> 'ok'). -spec(filesync/0 :: () -> 'ok'). -spec(cache_info/0 :: () -> [{atom(), term()}]). --spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok'). +-spec(set_mode/1 :: ('oppressed' | 'liberated') -> 'ok'). -endif. @@ -371,7 +371,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% brutal_kill. %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), - ok = rabbit_queue_mode_manager:register + ok = rabbit_memory_manager:register (self(), true, rabbit_disk_queue, set_mode, []), Node = node(), ok = @@ -448,7 +448,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% grant us to ram_disk mode. We have to start in ram_disk mode %% because we can't find values for mnesia_bytes_per_record or %% ets_bytes_per_record otherwise. - ok = rabbit_queue_mode_manager:report_memory(self(), 0, false), + ok = rabbit_memory_manager:report_memory(self(), 0, false), ok = report_memory(false, State2), {ok, start_memory_timer(State2), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -522,8 +522,8 @@ handle_cast({delete_queue, Q}, State) -> noreply(State1); handle_cast({set_mode, Mode}, State) -> noreply((case Mode of - disk -> fun to_disk_only_mode/1; - mixed -> fun to_ram_disk_mode/1 + oppressed -> fun to_disk_only_mode/1; + liberated -> fun to_ram_disk_mode/1 end)(State)); handle_cast({prefetch, Q, From}, State) -> {ok, Result, State1} = @@ -609,7 +609,7 @@ start_memory_timer(State) -> report_memory(Hibernating, State) -> Bytes = memory_use(State), - rabbit_queue_mode_manager:report_memory(self(), trunc(2.5 * Bytes), + rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes), Hibernating). memory_use(#dqstate { operation_mode = ram_disk, diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_memory_manager.erl index a2fab6154a..055e47952a 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_memory_manager.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_queue_mode_manager). +-module(rabbit_memory_manager). -behaviour(gen_server2). @@ -62,12 +62,12 @@ -endif. -record(state, { available_tokens, - mixed_queues, + liberated_processes, callbacks, tokens_per_byte, lowrate, hibernate, - unevictable, + unoppressable, alarmed }). @@ -77,69 +77,75 @@ %% system (RAM). Then, work out how many tokens each byte corresponds %% to. This is the tokens_per_byte field. When a process registers, it %% must provide an M-F-A triple to a function that needs one further -%% argument, which is the new mode. This will either be 'mixed' or -%% 'disk'. +%% argument, which is the new mode. This will either be 'liberated' or +%% 'oppressed'. %% %% Processes then report their own memory usage, in bytes, and the %% manager takes care of the rest. %% %% There are a finite number of tokens in the system. These are -%% allocated to processes as they are requested. We keep track of -%% processes which have hibernated, and processes that are doing only -%% a low rate of work. When a request for memory can't be satisfied, -%% we try and evict processes first from the hibernated group, and -%% then from the lowrate group. The hibernated group is a simple -%% queue, and so is implicitly sorted by the order in which processes -%% were added to the queue. This means that when removing from the -%% queue, we hibernate the sleepiest pid first. The lowrate group is a -%% priority queue, where the priority is the truncated log (base e) of -%% the amount of memory allocated. Thus when we remove from the queue, -%% we first remove the queue from the highest bucket. +%% allocated to processes as the processes report their memory +%% usage. We keep track of processes which have hibernated, and +%% processes that are doing only a low rate of work (reported as a low +%% gain or loss in memory between memory reports). When a process +%% reports memory use which can't be satisfied by the available +%% tokens, we try and oppress processes first from the hibernated +%% group, and then from the lowrate group. The hibernated group is a +%% simple queue, and so is implicitly sorted by the order in which +%% processes were added to the queue. This means that when removing +%% from the queue, we evict the sleepiest (and most passive) pid +%% first. The lowrate group is a priority queue, where the priority is +%% the truncated log (base e) of the amount of memory allocated. Thus +%% when we remove from the queue, we first remove the queue from the +%% highest bucket. %% -%% If the request still can't be satisfied after evicting to disk -%% everyone from those two groups (and note that we check first -%% whether or not freeing them would make available enough tokens to -%% satisfy the request rather than just sending all those queues to -%% disk and then going "whoops, didn't help after all"), then we send -%% the requesting process to disk. When a queue registers, it can -%% declare itself "unevictable". If a queue is unevictable then it -%% will not be sent to disk as a result of other processes requesting -%% more memory. However, if it itself is requesting more memory and -%% that request can't be satisfied then it is still sent to disk as -%% before. This feature is only used by the disk_queue, because if the -%% disk queue is not being used, and hibernates, and then memory -%% pressure gets tight, the disk_queue would typically be one of the -%% first processes to get sent to disk, which cripples -%% performance. Thus by setting it unevictable, it is only possible -%% for the disk_queue to be sent to disk when it is active and +%% If the reported memory use still can't be satisfied after +%% oppressing everyone from those two groups (and note that we check +%% first whether or not oppressing them would make available enough +%% tokens to satisfy the reported use rather than just oppressing all +%% those processes and then going "whoops, didn't help after all"), +%% then we oppress the reporting process. When a process registers, it +%% can declare itself "unoppressable". If a process is unoppressable +%% then it will not be sent to disk as a result of other processes +%% needing more tokens. However, if it itself needs additional tokens +%% which aren't available then it is still oppressed as before. This +%% feature is only used by the disk_queue, because if the disk queue +%% is not being used, and hibernates, and then memory pressure gets +%% tight, the disk_queue would typically be one of the first processes +%% to be oppressed (sent to disk_only mode), which cripples +%% performance. Thus by setting it unoppressable, it is only possible +%% for the disk_queue to be oppressed when it is active and %% attempting to increase its memory allocation. %% -%% If a process has been sent to disk, it continues making -%% requests. As soon as a request can be satisfied (and this can -%% include sending other processes to disk in the way described -%% above), it will be told to come back into mixed mode. We do not -%% keep any information about queues in disk mode. +%% If a process has been oppressed, it continues making memory +%% reports, as if it was liberated. As soon as a reported amount of +%% memory can be satisfied (and this can include oppressing other +%% processes in the way described above), it will be liberated. We do +%% not keep any information about oppressed processes. %% %% Note that the lowrate and hibernate groups can get very out of %% date. This is fine, and somewhat unavoidable given the absence of %% useful APIs for queues. Thus we allow them to get out of date %% (processes will be left in there when they change groups, %% duplicates can appear, dead processes are not pruned etc etc etc), -%% and when we go through the groups, summing up their amount of -%% memory, we tidy up at that point. +%% and when we go through the groups, summing up their allocated +%% tokens, we tidy up at that point. %% -%% A process which is not evicted to disk, and is requesting a smaller -%% amount of RAM than its last request will always be satisfied. A -%% mixed-mode process that is busy but consuming an unchanging amount -%% of RAM will never be sent to disk. The disk_queue is also managed -%% in the same way. This means that a queue that has gone back to -%% being mixed after being in disk mode now has its messages counted -%% twice as they are counted both in the request made by the queue -%% (even though they may not yet be in RAM (though see the -%% prefetcher)) and also by the disk_queue. Thus the amount of -%% available RAM must be higher when going disk -> mixed than when -%% going mixed -> disk. This is fairly sensible as it reduces the risk -%% of any oscillations occurring. +%% A liberated process, which is reporting a smaller amount of RAM +%% than its last report will remain liberated. A liberated process +%% that is busy but consuming an unchanging amount of RAM will never +%% be oppressed. + +%% Specific notes as applied to queues and the disk_queue: +%% +%% The disk_queue is managed in the same way as queues. This means +%% that a queue that has gone back to mixed mode after being in disk +%% mode now has its messages counted twice as they are counted both in +%% the report made by the queue (even though they may not yet be in +%% RAM (though see the prefetcher)) and also by the disk_queue. Thus +%% the amount of available RAM must be higher when going disk -> mixed +%% than when going mixed -> disk. This is fairly sensible as it +%% reduces the risk of any oscillations occurring. %% %% The queue process deliberately reports 4 times its estimated RAM %% usage, and the disk_queue 2.5 times. In practise, this seems to @@ -151,8 +157,8 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -register(Pid, Unevictable, Module, Function, Args) -> - gen_server2:cast(?SERVER, {register, Pid, Unevictable, +register(Pid, Unoppressable, Module, Function, Args) -> + gen_server2:cast(?SERVER, {register, Pid, Unoppressable, Module, Function, Args}). report_memory(Pid, Memory, Hibernating) -> @@ -177,31 +183,31 @@ init([]) -> true -> ?TOTAL_TOKENS / MemAvail end, {ok, #state { available_tokens = ?TOTAL_TOKENS, - mixed_queues = dict:new(), + liberated_processes = dict:new(), callbacks = dict:new(), tokens_per_byte = TPB, lowrate = priority_queue:new(), hibernate = queue:new(), - unevictable = sets:new(), + unoppressable = sets:new(), alarmed = false }}. handle_call(info, _From, State) -> State1 = #state { available_tokens = Avail, - mixed_queues = Mixed, + liberated_processes = Libre, lowrate = Lazy, hibernate = Sleepy, - unevictable = Unevictable } = + unoppressable = Unoppressable } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, - { mixed_queues, dict:to_list(Mixed) }, + { liberated_processes, dict:to_list(Libre) }, { lowrate_queues, priority_queue:to_list(Lazy) }, { hibernated_queues, queue:to_list(Sleepy) }, - { unevictable_queues, sets:to_list(Unevictable) }], State1}. + { unoppressable_queues, sets:to_list(Unoppressable) }], State1}. handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, - State = #state { mixed_queues = Mixed, + State = #state { liberated_processes = Libre, available_tokens = Avail, callbacks = Callbacks, tokens_per_byte = TPB, @@ -213,57 +219,57 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, {G, L} -> G < ?ACTIVITY_THRESHOLD andalso L < ?ACTIVITY_THRESHOLD end, - MixedActivity = if Hibernating -> hibernate; + LibreActivity = if Hibernating -> hibernate; LowRate -> lowrate; true -> active end, {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} = - case find_queue(Pid, Mixed) of - {mixed, {OAlloc, _OActivity}} -> + case find_process(Pid, Libre) of + {libre, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, - State1 = - #state { available_tokens = Avail2, mixed_queues = Mixed1 } + State1 = #state { available_tokens = Avail2, + liberated_processes = Libre1 } = free_upto(Pid, Req, State #state { available_tokens = Avail1 }), case Req > Avail2 of - true -> %% nowt we can do, send to disk - ok = set_queue_mode(Callbacks, Pid, disk), - {State1 #state { mixed_queues = - dict:erase(Pid, Mixed1) }, disk}; - false -> %% keep mixed + true -> %% nowt we can do, oppress the process + ok = set_process_mode(Callbacks, Pid, oppressed), + {State1 #state { liberated_processes = + dict:erase(Pid, Libre1) }, oppressed}; + false -> %% keep liberated {State1 #state - { mixed_queues = - dict:store(Pid, {Req, MixedActivity}, Mixed1), + { liberated_processes = + dict:store(Pid, {Req, LibreActivity}, Libre1), available_tokens = Avail2 - Req }, - MixedActivity} + LibreActivity} end; - disk -> + oppressed -> case Alarmed of true -> - {State, disk}; + {State, oppressed}; false -> State1 = #state { available_tokens = Avail1, - mixed_queues = Mixed1 } = + liberated_processes = Libre1 } = free_upto(Pid, Req, State), case Req > Avail1 orelse Hibernating orelse LowRate of true -> %% not enough space, or no compelling - %% reason, so stay as disk - {State1, disk}; - false -> %% can go to mixed mode - set_queue_mode(Callbacks, Pid, mixed), + %% reason, so stay oppressed + {State1, oppressed}; + false -> %% can liberate the process + set_process_mode(Callbacks, Pid, liberated), {State1 #state { - mixed_queues = - dict:store(Pid, {Req, MixedActivity}, Mixed1), + liberated_processes = + dict:store(Pid, {Req, LibreActivity}, Libre1), available_tokens = Avail1 - Req }, - MixedActivity} + LibreActivity} end end end, StateN1 = case ActivityNew of - active -> StateN; - disk -> StateN; + active -> StateN; + oppressed -> StateN; lowrate -> StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) }; hibernate -> @@ -271,17 +277,17 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, end, {noreply, StateN1}; -handle_cast({register, Pid, IsUnevictable, Module, Function, Args}, +handle_cast({register, Pid, IsUnoppressable, Module, Function, Args}, State = #state { callbacks = Callbacks, - unevictable = Unevictable }) -> + unoppressable = Unoppressable }) -> _MRef = erlang:monitor(process, Pid), - Unevictable1 = case IsUnevictable of - true -> sets:add_element(Pid, Unevictable); - false -> Unevictable + Unoppressable1 = case IsUnoppressable of + true -> sets:add_element(Pid, Unoppressable); + false -> Unoppressable end, {noreply, State #state { callbacks = dict:store (Pid, {Module, Function, Args}, Callbacks), - unevictable = Unevictable1 + unoppressable = Unoppressable1 }}; handle_cast({conserve_memory, Conserve}, State) -> @@ -289,13 +295,13 @@ handle_cast({conserve_memory, Conserve}, State) -> handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, - mixed_queues = Mixed }) -> - State1 = case find_queue(Pid, Mixed) of - disk -> + liberated_processes = Libre }) -> + State1 = case find_process(Pid, Libre) of + oppressed -> State; - {mixed, {Alloc, _Activity}} -> + {libre, {Alloc, _Activity}} -> State #state { available_tokens = Avail + Alloc, - mixed_queues = dict:erase(Pid, Mixed) } + liberated_processes = dict:erase(Pid, Libre) } end, {noreply, State1}; handle_info({'EXIT', _Pid, Reason}, State) -> @@ -315,18 +321,18 @@ add_to_lowrate(Pid, Alloc, Lazy) -> end, priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy). -find_queue(Pid, Mixed) -> - case dict:find(Pid, Mixed) of - {ok, Value} -> {mixed, Value}; - error -> disk +find_process(Pid, Libre) -> + case dict:find(Pid, Libre) of + {ok, Value} -> {libre, Value}; + error -> oppressed end. -set_queue_mode(Callbacks, Pid, Mode) -> +set_process_mode(Callbacks, Pid, Mode) -> {Module, Function, Args} = dict:fetch(Pid, Callbacks), erlang:apply(Module, Function, Args ++ [Mode]). -tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) -> - tidy_and_sum(lowrate, Mixed, +tidy_and_sum_lazy(IgnorePids, Lazy, Libre) -> + tidy_and_sum(lowrate, Libre, fun (Lazy1) -> case priority_queue:out(Lazy1) of {empty, Lazy2} -> @@ -337,12 +343,12 @@ tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) -> end, fun add_to_lowrate/3, IgnorePids, Lazy, priority_queue:new(), 0). -tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) -> - tidy_and_sum(hibernate, Mixed, fun queue:out/1, +tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) -> + tidy_and_sum(hibernate, Libre, fun queue:out/1, fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, IgnorePids, Sleepy, queue:new(), 0). -tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet, +tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet, CataInit, AnaInit, AllocAcc) -> case Catamorphism(CataInit) of {empty, _CataInit} -> {AnaInit, AllocAcc}; @@ -352,8 +358,8 @@ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet, true -> {DupCheckSet, AnaInit, AllocAcc}; false -> - case find_queue(Pid, Mixed) of - {mixed, {Alloc, AtomExpected}} -> + case find_process(Pid, Libre) of + {libre, {Alloc, AtomExpected}} -> {sets:add_element(Pid, DupCheckSet), Anamorphism(Pid, Alloc, AnaInit), Alloc + AllocAcc}; @@ -361,14 +367,14 @@ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet, {DupCheckSet, AnaInit, AllocAcc} end end, - tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, + tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) -> +free_upto_lazy(IgnorePids, Callbacks, Lazy, Libre, Req) -> free_from( Callbacks, - fun(_Mixed, Lazy1, LazyAcc) -> + fun(_Libre, Lazy1, LazyAcc) -> case priority_queue:out(Lazy1) of {empty, _Lazy2} -> empty; @@ -379,11 +385,11 @@ free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) -> false -> {value, Lazy2, Pid, Alloc} end end - end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req). + end, fun priority_queue:join/2, Libre, Lazy, priority_queue:new(), Req). -free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) -> +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) -> free_from(Callbacks, - fun(Mixed1, Sleepy1, SleepyAcc) -> + fun(Libre1, Sleepy1, SleepyAcc) -> case queue:out(Sleepy1) of {empty, _Sleepy2} -> empty; @@ -392,63 +398,63 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) -> true -> {skip, Sleepy2, queue:in(Pid, SleepyAcc)}; false -> {Alloc, hibernate} = - dict:fetch(Pid, Mixed1), + dict:fetch(Pid, Libre1), {value, Sleepy2, Pid, Alloc} end end - end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req). + end, fun queue:join/2, Libre, Sleepy, queue:new(), Req). -free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) -> - case Hylomorphism(Mixed, CataInit, AnaInit) of +free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) -> + case Hylomorphism(Libre, CataInit, AnaInit) of empty -> - {AnaInit, Mixed, Req}; + {AnaInit, Libre, Req}; {skip, CataInit1, AnaInit1} -> - free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1, + free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit1, AnaInit1, Req); {value, CataInit1, Pid, Alloc} -> - Mixed1 = dict:erase(Pid, Mixed), - ok = set_queue_mode(Callbacks, Pid, disk), + Libre1 = dict:erase(Pid, Libre), + ok = set_process_mode(Callbacks, Pid, oppressed), case Req > Alloc of - true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1, + true -> free_from(Callbacks, Hylomorphism, BaseCase, Libre1, CataInit1, AnaInit, Req - Alloc); - false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc} + false -> {BaseCase(CataInit1, AnaInit), Libre1, Req - Alloc} end end. free_upto(Pid, Req, State = #state { available_tokens = Avail, - mixed_queues = Mixed, + liberated_processes = Libre, callbacks = Callbacks, lowrate = Lazy, hibernate = Sleepy, - unevictable = Unevictable }) + unoppressable = Unoppressable }) when Req > Avail -> - Unevictable1 = sets:add_element(Pid, Unevictable), - {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed), + Unoppressable1 = sets:add_element(Pid, Unoppressable), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre), case Req > Avail + SleepySum of true -> %% not enough in sleepy, have a look in lazy too - {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed), + {Lazy1, LazySum} = tidy_and_sum_lazy(Unoppressable1, Lazy, Libre), case Req > Avail + SleepySum + LazySum of true -> %% can't free enough, just return tidied state State #state { lowrate = Lazy1, hibernate = Sleepy1 }; false -> %% need to free all of sleepy, and some of lazy - {Sleepy2, Mixed1, ReqRem} = - free_upto_sleepy(Unevictable1, Callbacks, - Sleepy1, Mixed, Req), - {Lazy2, Mixed2, ReqRem1} = - free_upto_lazy(Unevictable1, Callbacks, - Lazy1, Mixed1, ReqRem), + {Sleepy2, Libre1, ReqRem} = + free_upto_sleepy(Unoppressable1, Callbacks, + Sleepy1, Libre, Req), + {Lazy2, Libre2, ReqRem1} = + free_upto_lazy(Unoppressable1, Callbacks, + Lazy1, Libre1, ReqRem), %% ReqRem1 will be <= 0 because it's %% likely we'll have freed more than we %% need, thus Req - ReqRem1 is total freed State #state { available_tokens = Avail + (Req - ReqRem1), - mixed_queues = Mixed2, lowrate = Lazy2, + liberated_processes = Libre2, lowrate = Lazy2, hibernate = Sleepy2 } end; false -> %% enough available in sleepy, don't touch lazy - {Sleepy2, Mixed1, ReqRem} = - free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req), + {Sleepy2, Libre1, ReqRem} = + free_upto_sleepy(Unoppressable1, Callbacks, Sleepy1, Libre, Req), State #state { available_tokens = Avail + (Req - ReqRem), - mixed_queues = Mixed1, hibernate = Sleepy2 } + liberated_processes = Libre1, hibernate = Sleepy2 } end; free_upto(_Pid, _Req, State) -> State. |
