summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-08-27 18:12:12 +0100
committerMatthew Sackman <matthew@lshift.net>2009-08-27 18:12:12 +0100
commit210835025b0c98ec313cd33731bc29ec6acbe609 (patch)
tree04a48ac9bd5af830ccbee2a3fdc959e51f7a851a /src
parentc6ad55e7e6096ed2d5f2b1043b4dedc33317c432 (diff)
downloadrabbitmq-server-git-210835025b0c98ec313cd33731bc29ec6acbe609.tar.gz
All QA comments relating to queue_mode_manager
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_disk_queue.erl12
-rw-r--r--src/rabbit_memory_manager.erl (renamed from src/rabbit_queue_mode_manager.erl)284
5 files changed, 160 insertions, 150 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 665f10a273..8962b12ea6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -140,7 +140,7 @@ start(normal, []) ->
{ok, MemoryAlarms} = application:get_env(memory_alarms),
ok = rabbit_alarm:start(MemoryAlarms),
- ok = start_child(rabbit_queue_mode_manager),
+ ok = start_child(rabbit_memory_manager),
ok = rabbit_binary_generator:
check_empty_content_body_frame_size(),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 51b2e8f587..ad0a0f0c68 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -102,7 +102,7 @@
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
-spec(unblock/2 :: (pid(), pid()) -> 'ok').
--spec(set_storage_mode/2 :: (pid(), ('disk' | 'mixed')) -> 'ok').
+-spec(set_storage_mode/2 :: (pid(), ('oppressed' | 'liberated')) -> 'ok').
-spec(internal_declare/2 :: (amqqueue(), bool()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index adf84c0e4c..72325414cd 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -101,7 +101,7 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
- ok = rabbit_queue_mode_manager:register
+ ok = rabbit_memory_manager:register
(self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
{ok, MS} = rabbit_mixed_queue:init(QName, Durable),
State = #q{q = Q,
@@ -551,7 +551,7 @@ i(Item, _) ->
report_memory(Hib, State = #q { mixed_state = MS }) ->
{MS1, MSize, Gain, Loss} =
rabbit_mixed_queue:estimate_queue_memory_and_reset_counters(MS),
- rabbit_queue_mode_manager:report_memory(self(), MSize, Gain, Loss, Hib),
+ rabbit_memory_manager:report_memory(self(), MSize, Gain, Loss, Hib),
State #q { mixed_state = MS1 }.
%---------------------------------------------------------------------------
@@ -820,7 +820,11 @@ handle_cast({set_storage_mode, Mode}, State = #q { mixed_state = MS }) ->
PendingMessages =
lists:flatten([Pending || #tx { pending_messages = Pending}
<- all_tx_record()]),
- {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode, PendingMessages, MS),
+ Mode1 = case Mode of
+ liberated -> mixed;
+ oppressed -> disk
+ end,
+ {ok, MS1} = rabbit_mixed_queue:set_storage_mode(Mode1, PendingMessages, MS),
noreply(State #q { mixed_state = MS1 }).
handle_info(report_memory, State) ->
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index d9f318e059..18b250c55c 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -281,7 +281,7 @@
-spec(to_ram_disk_mode/0 :: () -> 'ok').
-spec(filesync/0 :: () -> 'ok').
-spec(cache_info/0 :: () -> [{atom(), term()}]).
--spec(set_mode/1 :: ('disk' | 'mixed') -> 'ok').
+-spec(set_mode/1 :: ('oppressed' | 'liberated') -> 'ok').
-endif.
@@ -371,7 +371,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% brutal_kill.
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
- ok = rabbit_queue_mode_manager:register
+ ok = rabbit_memory_manager:register
(self(), true, rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
@@ -448,7 +448,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% grant us to ram_disk mode. We have to start in ram_disk mode
%% because we can't find values for mnesia_bytes_per_record or
%% ets_bytes_per_record otherwise.
- ok = rabbit_queue_mode_manager:report_memory(self(), 0, false),
+ ok = rabbit_memory_manager:report_memory(self(), 0, false),
ok = report_memory(false, State2),
{ok, start_memory_timer(State2), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -522,8 +522,8 @@ handle_cast({delete_queue, Q}, State) ->
noreply(State1);
handle_cast({set_mode, Mode}, State) ->
noreply((case Mode of
- disk -> fun to_disk_only_mode/1;
- mixed -> fun to_ram_disk_mode/1
+ oppressed -> fun to_disk_only_mode/1;
+ liberated -> fun to_ram_disk_mode/1
end)(State));
handle_cast({prefetch, Q, From}, State) ->
{ok, Result, State1} =
@@ -609,7 +609,7 @@ start_memory_timer(State) ->
report_memory(Hibernating, State) ->
Bytes = memory_use(State),
- rabbit_queue_mode_manager:report_memory(self(), trunc(2.5 * Bytes),
+ rabbit_memory_manager:report_memory(self(), trunc(2.5 * Bytes),
Hibernating).
memory_use(#dqstate { operation_mode = ram_disk,
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_memory_manager.erl
index a2fab6154a..055e47952a 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_memory_manager.erl
@@ -29,7 +29,7 @@
%% Contributor(s): ______________________________________.
%%
--module(rabbit_queue_mode_manager).
+-module(rabbit_memory_manager).
-behaviour(gen_server2).
@@ -62,12 +62,12 @@
-endif.
-record(state, { available_tokens,
- mixed_queues,
+ liberated_processes,
callbacks,
tokens_per_byte,
lowrate,
hibernate,
- unevictable,
+ unoppressable,
alarmed
}).
@@ -77,69 +77,75 @@
%% system (RAM). Then, work out how many tokens each byte corresponds
%% to. This is the tokens_per_byte field. When a process registers, it
%% must provide an M-F-A triple to a function that needs one further
-%% argument, which is the new mode. This will either be 'mixed' or
-%% 'disk'.
+%% argument, which is the new mode. This will either be 'liberated' or
+%% 'oppressed'.
%%
%% Processes then report their own memory usage, in bytes, and the
%% manager takes care of the rest.
%%
%% There are a finite number of tokens in the system. These are
-%% allocated to processes as they are requested. We keep track of
-%% processes which have hibernated, and processes that are doing only
-%% a low rate of work. When a request for memory can't be satisfied,
-%% we try and evict processes first from the hibernated group, and
-%% then from the lowrate group. The hibernated group is a simple
-%% queue, and so is implicitly sorted by the order in which processes
-%% were added to the queue. This means that when removing from the
-%% queue, we hibernate the sleepiest pid first. The lowrate group is a
-%% priority queue, where the priority is the truncated log (base e) of
-%% the amount of memory allocated. Thus when we remove from the queue,
-%% we first remove the queue from the highest bucket.
+%% allocated to processes as the processes report their memory
+%% usage. We keep track of processes which have hibernated, and
+%% processes that are doing only a low rate of work (reported as a low
+%% gain or loss in memory between memory reports). When a process
+%% reports memory use which can't be satisfied by the available
+%% tokens, we try and oppress processes first from the hibernated
+%% group, and then from the lowrate group. The hibernated group is a
+%% simple queue, and so is implicitly sorted by the order in which
+%% processes were added to the queue. This means that when removing
+%% from the queue, we evict the sleepiest (and most passive) pid
+%% first. The lowrate group is a priority queue, where the priority is
+%% the truncated log (base e) of the amount of memory allocated. Thus
+%% when we remove from the queue, we first remove the queue from the
+%% highest bucket.
%%
-%% If the request still can't be satisfied after evicting to disk
-%% everyone from those two groups (and note that we check first
-%% whether or not freeing them would make available enough tokens to
-%% satisfy the request rather than just sending all those queues to
-%% disk and then going "whoops, didn't help after all"), then we send
-%% the requesting process to disk. When a queue registers, it can
-%% declare itself "unevictable". If a queue is unevictable then it
-%% will not be sent to disk as a result of other processes requesting
-%% more memory. However, if it itself is requesting more memory and
-%% that request can't be satisfied then it is still sent to disk as
-%% before. This feature is only used by the disk_queue, because if the
-%% disk queue is not being used, and hibernates, and then memory
-%% pressure gets tight, the disk_queue would typically be one of the
-%% first processes to get sent to disk, which cripples
-%% performance. Thus by setting it unevictable, it is only possible
-%% for the disk_queue to be sent to disk when it is active and
+%% If the reported memory use still can't be satisfied after
+%% oppressing everyone from those two groups (and note that we check
+%% first whether or not oppressing them would make available enough
+%% tokens to satisfy the reported use rather than just oppressing all
+%% those processes and then going "whoops, didn't help after all"),
+%% then we oppress the reporting process. When a process registers, it
+%% can declare itself "unoppressable". If a process is unoppressable
+%% then it will not be sent to disk as a result of other processes
+%% needing more tokens. However, if it itself needs additional tokens
+%% which aren't available then it is still oppressed as before. This
+%% feature is only used by the disk_queue, because if the disk queue
+%% is not being used, and hibernates, and then memory pressure gets
+%% tight, the disk_queue would typically be one of the first processes
+%% to be oppressed (sent to disk_only mode), which cripples
+%% performance. Thus by setting it unoppressable, it is only possible
+%% for the disk_queue to be oppressed when it is active and
%% attempting to increase its memory allocation.
%%
-%% If a process has been sent to disk, it continues making
-%% requests. As soon as a request can be satisfied (and this can
-%% include sending other processes to disk in the way described
-%% above), it will be told to come back into mixed mode. We do not
-%% keep any information about queues in disk mode.
+%% If a process has been oppressed, it continues making memory
+%% reports, as if it was liberated. As soon as a reported amount of
+%% memory can be satisfied (and this can include oppressing other
+%% processes in the way described above), it will be liberated. We do
+%% not keep any information about oppressed processes.
%%
%% Note that the lowrate and hibernate groups can get very out of
%% date. This is fine, and somewhat unavoidable given the absence of
%% useful APIs for queues. Thus we allow them to get out of date
%% (processes will be left in there when they change groups,
%% duplicates can appear, dead processes are not pruned etc etc etc),
-%% and when we go through the groups, summing up their amount of
-%% memory, we tidy up at that point.
+%% and when we go through the groups, summing up their allocated
+%% tokens, we tidy up at that point.
%%
-%% A process which is not evicted to disk, and is requesting a smaller
-%% amount of RAM than its last request will always be satisfied. A
-%% mixed-mode process that is busy but consuming an unchanging amount
-%% of RAM will never be sent to disk. The disk_queue is also managed
-%% in the same way. This means that a queue that has gone back to
-%% being mixed after being in disk mode now has its messages counted
-%% twice as they are counted both in the request made by the queue
-%% (even though they may not yet be in RAM (though see the
-%% prefetcher)) and also by the disk_queue. Thus the amount of
-%% available RAM must be higher when going disk -> mixed than when
-%% going mixed -> disk. This is fairly sensible as it reduces the risk
-%% of any oscillations occurring.
+%% A liberated process, which is reporting a smaller amount of RAM
+%% than its last report will remain liberated. A liberated process
+%% that is busy but consuming an unchanging amount of RAM will never
+%% be oppressed.
+
+%% Specific notes as applied to queues and the disk_queue:
+%%
+%% The disk_queue is managed in the same way as queues. This means
+%% that a queue that has gone back to mixed mode after being in disk
+%% mode now has its messages counted twice as they are counted both in
+%% the report made by the queue (even though they may not yet be in
+%% RAM (though see the prefetcher)) and also by the disk_queue. Thus
+%% the amount of available RAM must be higher when going disk -> mixed
+%% than when going mixed -> disk. This is fairly sensible as it
+%% reduces the risk of any oscillations occurring.
%%
%% The queue process deliberately reports 4 times its estimated RAM
%% usage, and the disk_queue 2.5 times. In practise, this seems to
@@ -151,8 +157,8 @@
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-register(Pid, Unevictable, Module, Function, Args) ->
- gen_server2:cast(?SERVER, {register, Pid, Unevictable,
+register(Pid, Unoppressable, Module, Function, Args) ->
+ gen_server2:cast(?SERVER, {register, Pid, Unoppressable,
Module, Function, Args}).
report_memory(Pid, Memory, Hibernating) ->
@@ -177,31 +183,31 @@ init([]) ->
true -> ?TOTAL_TOKENS / MemAvail
end,
{ok, #state { available_tokens = ?TOTAL_TOKENS,
- mixed_queues = dict:new(),
+ liberated_processes = dict:new(),
callbacks = dict:new(),
tokens_per_byte = TPB,
lowrate = priority_queue:new(),
hibernate = queue:new(),
- unevictable = sets:new(),
+ unoppressable = sets:new(),
alarmed = false
}}.
handle_call(info, _From, State) ->
State1 = #state { available_tokens = Avail,
- mixed_queues = Mixed,
+ liberated_processes = Libre,
lowrate = Lazy,
hibernate = Sleepy,
- unevictable = Unevictable } =
+ unoppressable = Unoppressable } =
free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
{reply, [{ available_tokens, Avail },
- { mixed_queues, dict:to_list(Mixed) },
+ { liberated_processes, dict:to_list(Libre) },
{ lowrate_queues, priority_queue:to_list(Lazy) },
{ hibernated_queues, queue:to_list(Sleepy) },
- { unevictable_queues, sets:to_list(Unevictable) }], State1}.
+ { unoppressable_queues, sets:to_list(Unoppressable) }], State1}.
handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
- State = #state { mixed_queues = Mixed,
+ State = #state { liberated_processes = Libre,
available_tokens = Avail,
callbacks = Callbacks,
tokens_per_byte = TPB,
@@ -213,57 +219,57 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
{G, L} -> G < ?ACTIVITY_THRESHOLD andalso
L < ?ACTIVITY_THRESHOLD
end,
- MixedActivity = if Hibernating -> hibernate;
+ LibreActivity = if Hibernating -> hibernate;
LowRate -> lowrate;
true -> active
end,
{StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} =
- case find_queue(Pid, Mixed) of
- {mixed, {OAlloc, _OActivity}} ->
+ case find_process(Pid, Libre) of
+ {libre, {OAlloc, _OActivity}} ->
Avail1 = Avail + OAlloc,
- State1 =
- #state { available_tokens = Avail2, mixed_queues = Mixed1 }
+ State1 = #state { available_tokens = Avail2,
+ liberated_processes = Libre1 }
= free_upto(Pid, Req,
State #state { available_tokens = Avail1 }),
case Req > Avail2 of
- true -> %% nowt we can do, send to disk
- ok = set_queue_mode(Callbacks, Pid, disk),
- {State1 #state { mixed_queues =
- dict:erase(Pid, Mixed1) }, disk};
- false -> %% keep mixed
+ true -> %% nowt we can do, oppress the process
+ ok = set_process_mode(Callbacks, Pid, oppressed),
+ {State1 #state { liberated_processes =
+ dict:erase(Pid, Libre1) }, oppressed};
+ false -> %% keep liberated
{State1 #state
- { mixed_queues =
- dict:store(Pid, {Req, MixedActivity}, Mixed1),
+ { liberated_processes =
+ dict:store(Pid, {Req, LibreActivity}, Libre1),
available_tokens = Avail2 - Req },
- MixedActivity}
+ LibreActivity}
end;
- disk ->
+ oppressed ->
case Alarmed of
true ->
- {State, disk};
+ {State, oppressed};
false ->
State1 = #state { available_tokens = Avail1,
- mixed_queues = Mixed1 } =
+ liberated_processes = Libre1 } =
free_upto(Pid, Req, State),
case Req > Avail1 orelse Hibernating orelse LowRate of
true ->
%% not enough space, or no compelling
- %% reason, so stay as disk
- {State1, disk};
- false -> %% can go to mixed mode
- set_queue_mode(Callbacks, Pid, mixed),
+ %% reason, so stay oppressed
+ {State1, oppressed};
+ false -> %% can liberate the process
+ set_process_mode(Callbacks, Pid, liberated),
{State1 #state {
- mixed_queues =
- dict:store(Pid, {Req, MixedActivity}, Mixed1),
+ liberated_processes =
+ dict:store(Pid, {Req, LibreActivity}, Libre1),
available_tokens = Avail1 - Req },
- MixedActivity}
+ LibreActivity}
end
end
end,
StateN1 =
case ActivityNew of
- active -> StateN;
- disk -> StateN;
+ active -> StateN;
+ oppressed -> StateN;
lowrate ->
StateN #state { lowrate = add_to_lowrate(Pid, Req, Lazy) };
hibernate ->
@@ -271,17 +277,17 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
end,
{noreply, StateN1};
-handle_cast({register, Pid, IsUnevictable, Module, Function, Args},
+handle_cast({register, Pid, IsUnoppressable, Module, Function, Args},
State = #state { callbacks = Callbacks,
- unevictable = Unevictable }) ->
+ unoppressable = Unoppressable }) ->
_MRef = erlang:monitor(process, Pid),
- Unevictable1 = case IsUnevictable of
- true -> sets:add_element(Pid, Unevictable);
- false -> Unevictable
+ Unoppressable1 = case IsUnoppressable of
+ true -> sets:add_element(Pid, Unoppressable);
+ false -> Unoppressable
end,
{noreply, State #state { callbacks = dict:store
(Pid, {Module, Function, Args}, Callbacks),
- unevictable = Unevictable1
+ unoppressable = Unoppressable1
}};
handle_cast({conserve_memory, Conserve}, State) ->
@@ -289,13 +295,13 @@ handle_cast({conserve_memory, Conserve}, State) ->
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
- mixed_queues = Mixed }) ->
- State1 = case find_queue(Pid, Mixed) of
- disk ->
+ liberated_processes = Libre }) ->
+ State1 = case find_process(Pid, Libre) of
+ oppressed ->
State;
- {mixed, {Alloc, _Activity}} ->
+ {libre, {Alloc, _Activity}} ->
State #state { available_tokens = Avail + Alloc,
- mixed_queues = dict:erase(Pid, Mixed) }
+ liberated_processes = dict:erase(Pid, Libre) }
end,
{noreply, State1};
handle_info({'EXIT', _Pid, Reason}, State) ->
@@ -315,18 +321,18 @@ add_to_lowrate(Pid, Alloc, Lazy) ->
end,
priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy).
-find_queue(Pid, Mixed) ->
- case dict:find(Pid, Mixed) of
- {ok, Value} -> {mixed, Value};
- error -> disk
+find_process(Pid, Libre) ->
+ case dict:find(Pid, Libre) of
+ {ok, Value} -> {libre, Value};
+ error -> oppressed
end.
-set_queue_mode(Callbacks, Pid, Mode) ->
+set_process_mode(Callbacks, Pid, Mode) ->
{Module, Function, Args} = dict:fetch(Pid, Callbacks),
erlang:apply(Module, Function, Args ++ [Mode]).
-tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) ->
- tidy_and_sum(lowrate, Mixed,
+tidy_and_sum_lazy(IgnorePids, Lazy, Libre) ->
+ tidy_and_sum(lowrate, Libre,
fun (Lazy1) ->
case priority_queue:out(Lazy1) of
{empty, Lazy2} ->
@@ -337,12 +343,12 @@ tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) ->
end, fun add_to_lowrate/3, IgnorePids, Lazy,
priority_queue:new(), 0).
-tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) ->
- tidy_and_sum(hibernate, Mixed, fun queue:out/1,
+tidy_and_sum_sleepy(IgnorePids, Sleepy, Libre) ->
+ tidy_and_sum(hibernate, Libre, fun queue:out/1,
fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
IgnorePids, Sleepy, queue:new(), 0).
-tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet,
+tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism, DupCheckSet,
CataInit, AnaInit, AllocAcc) ->
case Catamorphism(CataInit) of
{empty, _CataInit} -> {AnaInit, AllocAcc};
@@ -352,8 +358,8 @@ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet,
true ->
{DupCheckSet, AnaInit, AllocAcc};
false ->
- case find_queue(Pid, Mixed) of
- {mixed, {Alloc, AtomExpected}} ->
+ case find_process(Pid, Libre) of
+ {libre, {Alloc, AtomExpected}} ->
{sets:add_element(Pid, DupCheckSet),
Anamorphism(Pid, Alloc, AnaInit),
Alloc + AllocAcc};
@@ -361,14 +367,14 @@ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet,
{DupCheckSet, AnaInit, AllocAcc}
end
end,
- tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism,
+ tidy_and_sum(AtomExpected, Libre, Catamorphism, Anamorphism,
DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
end.
-free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) ->
+free_upto_lazy(IgnorePids, Callbacks, Lazy, Libre, Req) ->
free_from(
Callbacks,
- fun(_Mixed, Lazy1, LazyAcc) ->
+ fun(_Libre, Lazy1, LazyAcc) ->
case priority_queue:out(Lazy1) of
{empty, _Lazy2} ->
empty;
@@ -379,11 +385,11 @@ free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) ->
false -> {value, Lazy2, Pid, Alloc}
end
end
- end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req).
+ end, fun priority_queue:join/2, Libre, Lazy, priority_queue:new(), Req).
-free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) ->
+free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Libre, Req) ->
free_from(Callbacks,
- fun(Mixed1, Sleepy1, SleepyAcc) ->
+ fun(Libre1, Sleepy1, SleepyAcc) ->
case queue:out(Sleepy1) of
{empty, _Sleepy2} ->
empty;
@@ -392,63 +398,63 @@ free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) ->
true -> {skip, Sleepy2,
queue:in(Pid, SleepyAcc)};
false -> {Alloc, hibernate} =
- dict:fetch(Pid, Mixed1),
+ dict:fetch(Pid, Libre1),
{value, Sleepy2, Pid, Alloc}
end
end
- end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req).
+ end, fun queue:join/2, Libre, Sleepy, queue:new(), Req).
-free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) ->
- case Hylomorphism(Mixed, CataInit, AnaInit) of
+free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit, AnaInit, Req) ->
+ case Hylomorphism(Libre, CataInit, AnaInit) of
empty ->
- {AnaInit, Mixed, Req};
+ {AnaInit, Libre, Req};
{skip, CataInit1, AnaInit1} ->
- free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1,
+ free_from(Callbacks, Hylomorphism, BaseCase, Libre, CataInit1,
AnaInit1, Req);
{value, CataInit1, Pid, Alloc} ->
- Mixed1 = dict:erase(Pid, Mixed),
- ok = set_queue_mode(Callbacks, Pid, disk),
+ Libre1 = dict:erase(Pid, Libre),
+ ok = set_process_mode(Callbacks, Pid, oppressed),
case Req > Alloc of
- true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1,
+ true -> free_from(Callbacks, Hylomorphism, BaseCase, Libre1,
CataInit1, AnaInit, Req - Alloc);
- false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc}
+ false -> {BaseCase(CataInit1, AnaInit), Libre1, Req - Alloc}
end
end.
free_upto(Pid, Req, State = #state { available_tokens = Avail,
- mixed_queues = Mixed,
+ liberated_processes = Libre,
callbacks = Callbacks,
lowrate = Lazy,
hibernate = Sleepy,
- unevictable = Unevictable })
+ unoppressable = Unoppressable })
when Req > Avail ->
- Unevictable1 = sets:add_element(Pid, Unevictable),
- {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed),
+ Unoppressable1 = sets:add_element(Pid, Unoppressable),
+ {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Libre),
case Req > Avail + SleepySum of
true -> %% not enough in sleepy, have a look in lazy too
- {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed),
+ {Lazy1, LazySum} = tidy_and_sum_lazy(Unoppressable1, Lazy, Libre),
case Req > Avail + SleepySum + LazySum of
true -> %% can't free enough, just return tidied state
State #state { lowrate = Lazy1, hibernate = Sleepy1 };
false -> %% need to free all of sleepy, and some of lazy
- {Sleepy2, Mixed1, ReqRem} =
- free_upto_sleepy(Unevictable1, Callbacks,
- Sleepy1, Mixed, Req),
- {Lazy2, Mixed2, ReqRem1} =
- free_upto_lazy(Unevictable1, Callbacks,
- Lazy1, Mixed1, ReqRem),
+ {Sleepy2, Libre1, ReqRem} =
+ free_upto_sleepy(Unoppressable1, Callbacks,
+ Sleepy1, Libre, Req),
+ {Lazy2, Libre2, ReqRem1} =
+ free_upto_lazy(Unoppressable1, Callbacks,
+ Lazy1, Libre1, ReqRem),
%% ReqRem1 will be <= 0 because it's
%% likely we'll have freed more than we
%% need, thus Req - ReqRem1 is total freed
State #state { available_tokens = Avail + (Req - ReqRem1),
- mixed_queues = Mixed2, lowrate = Lazy2,
+ liberated_processes = Libre2, lowrate = Lazy2,
hibernate = Sleepy2 }
end;
false -> %% enough available in sleepy, don't touch lazy
- {Sleepy2, Mixed1, ReqRem} =
- free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req),
+ {Sleepy2, Libre1, ReqRem} =
+ free_upto_sleepy(Unoppressable1, Callbacks, Sleepy1, Libre, Req),
State #state { available_tokens = Avail + (Req - ReqRem),
- mixed_queues = Mixed1, hibernate = Sleepy2 }
+ liberated_processes = Libre1, hibernate = Sleepy2 }
end;
free_upto(_Pid, _Req, State) ->
State.