summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl2
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_disk_queue.erl2
-rw-r--r--src/rabbit_queue_mode_manager.erl326
4 files changed, 172 insertions, 160 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9b77949dea..f0e8d4c2f4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -232,7 +232,7 @@ set_mode_pin(VHostPath, Queue, DiskBin)
fun(Q) -> case Disk of
true -> rabbit_queue_mode_manager:pin_to_disk
(Q #amqqueue.pid);
- false -> rabbit_queue_mode_manager:unpin_to_disk
+ false -> rabbit_queue_mode_manager:unpin_from_disk
(Q #amqqueue.pid)
end
end).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index dba7ec2406..8ee576f7c0 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,7 +102,7 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName, durable = Durable }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
ok = rabbit_queue_mode_manager:register
- (self(), rabbit_amqqueue, set_mode, [self()]),
+ (self(), false, rabbit_amqqueue, set_mode, [self()]),
{ok, MS} = rabbit_mixed_queue:init(QName, Durable),
State = #q{q = Q,
owner = none,
diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl
index 2f8310581c..76399022c5 100644
--- a/src/rabbit_disk_queue.erl
+++ b/src/rabbit_disk_queue.erl
@@ -379,7 +379,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) ->
%% Otherwise, the gen_server will be immediately terminated.
process_flag(trap_exit, true),
ok = rabbit_queue_mode_manager:register
- (self(), rabbit_disk_queue, set_mode, []),
+ (self(), true, rabbit_disk_queue, set_mode, []),
Node = node(),
ok =
case mnesia:change_table_copy_type(rabbit_disk_queue, Node,
diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl
index 194ddf9570..37afdc6c82 100644
--- a/src/rabbit_queue_mode_manager.erl
+++ b/src/rabbit_queue_mode_manager.erl
@@ -38,8 +38,8 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([register/4, report_memory/3, report_memory/5, info/0,
- pin_to_disk/1, unpin_to_disk/1]).
+-export([register/5, report_memory/3, report_memory/5, info/0,
+ pin_to_disk/1, unpin_from_disk/1]).
-define(TOTAL_TOKENS, 10000000).
-define(ACTIVITY_THRESHOLD, 25).
@@ -50,14 +50,14 @@
-spec(start_link/0 :: () ->
({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok').
+-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok').
-spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok').
-spec(report_memory/5 :: (pid(), non_neg_integer(),
(non_neg_integer() | 'undefined'),
(non_neg_integer() | 'undefined'), bool()) ->
'ok').
-spec(pin_to_disk/1 :: (pid()) -> 'ok').
--spec(unpin_to_disk/1 :: (pid()) -> 'ok').
+-spec(unpin_from_disk/1 :: (pid()) -> 'ok').
-endif.
@@ -67,7 +67,8 @@
tokens_per_byte,
lowrate,
hibernate,
- disk_mode_pins
+ disk_mode_pins,
+ unevictable
}).
%% Token-credit based memory management
@@ -91,24 +92,36 @@
%% queue, and so is implicitly sorted by the order in which processes
%% were added to the queue. This means that when removing from the
%% queue, we hibernate the sleepiest pid first. The lowrate group is a
-%% priority queue, where the priority is the amount of memory
-%% allocated. Thus when we remove from the queue, we first remove the
-%% queue with the most amount of memory.
+%% priority queue, where the priority is the truncated log (base e) of
+%% the amount of memory allocated. Thus when we remove from the queue,
+%% we first remove the queue from the highest bucket.
%%
%% If the request still can't be satisfied after evicting to disk
%% everyone from those two groups (and note that we check first
%% whether or not freeing them would make available enough tokens to
%% satisfy the request rather than just sending all those queues to
-%% disk and then going "whoops, didn't help afterall"), then we send
-%% the requesting process to disk.
+%% disk and then going "whoops, didn't help after all"), then we send
+%% the requesting process to disk. When a queue registers, it can
+%% declare itself "unevictable". If a queue is unevictable then it
+%% will not be sent to disk as a result of other processes requesting
+%% more memory. However, if it itself is requesting more memory and
+%% that request can't be satisfied then it is still sent to disk as
+%% before. This feature is only used by the disk_queue, because if the
+%% disk queue is not being used, and hibernates, and then memory
+%% pressure gets tight, the disk_queue would typically be one of the
+%% first processes to get sent to disk, which cripples
+%% performance. Thus by setting it unevictable, it is only possible
+%% for the disk_queue to be sent to disk when it is active and
+%% attempting to increase its memory allocation.
%%
%% If a process has been sent to disk, it continues making
%% requests. As soon as a request can be satisfied (and this can
%% include sending other processes to disk in the way described
-%% above), it will be told to come back into mixed mode.
+%% above), it will be told to come back into mixed mode. We do not
+%% keep any information about queues in disk mode.
%%
%% Note that the lowrate and hibernate groups can get very out of
-%% date. This is fine, and kinda unavoidable given the absence of
+%% date. This is fine, and somewhat unavoidable given the absence of
%% useful APIs for queues. Thus we allow them to get out of date
%% (processes will be left in there when they change groups,
%% duplicates can appear, dead processes are not pruned etc etc etc),
@@ -116,17 +129,17 @@
%% memory, we tidy up at that point.
%%
%% A process which is not evicted to disk, and is requesting a smaller
-%% amount of ram than its last request will always be satisfied. A
+%% amount of RAM than its last request will always be satisfied. A
%% mixed-mode process that is busy but consuming an unchanging amount
%% of RAM will never be sent to disk. The disk_queue is also managed
%% in the same way. This means that a queue that has gone back to
%% being mixed after being in disk mode now has its messages counted
%% twice as they are counted both in the request made by the queue
-%% (even though they may not yet be in RAM) and also by the
-%% disk_queue. This means that the threshold for going mixed -> disk
-%% is above the threshold for going disk -> mixed. This is actually
-%% fairly sensible as it reduces the risk of any oscillations
-%% occurring.
+%% (even though they may not yet be in RAM (though see the
+%% prefetcher)) and also by the disk_queue. Thus the amount of
+%% available RAM must be higher when going disk -> mixed than when
+%% going mixed -> disk. This is fairly sensible as it reduces the risk
+%% of any oscillations occurring.
%%
%% The queue process deliberately reports 4 times its estimated RAM
%% usage, and the disk_queue 2.5 times. In practise, this seems to
@@ -138,14 +151,15 @@
start_link() ->
gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-register(Pid, Module, Function, Args) ->
- gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}).
+register(Pid, Unevictable, Module, Function, Args) ->
+ gen_server2:cast(?SERVER, {register, Pid, Unevictable,
+ Module, Function, Args}).
pin_to_disk(Pid) ->
gen_server2:call(?SERVER, {pin_to_disk, Pid}).
-unpin_to_disk(Pid) ->
- gen_server2:call(?SERVER, {unpin_to_disk, Pid}).
+unpin_from_disk(Pid) ->
+ gen_server2:call(?SERVER, {unpin_from_disk, Pid}).
report_memory(Pid, Memory, Hibernating) ->
report_memory(Pid, Memory, undefined, undefined, Hibernating).
@@ -168,7 +182,8 @@ init([]) ->
tokens_per_byte = ?TOTAL_TOKENS / MemAvail,
lowrate = priority_queue:new(),
hibernate = queue:new(),
- disk_mode_pins = sets:new()
+ disk_mode_pins = sets:new(),
+ unevictable = sets:new()
}}.
handle_call({pin_to_disk, Pid}, _From,
@@ -182,23 +197,20 @@ handle_call({pin_to_disk, Pid}, _From,
false ->
case find_queue(Pid, Mixed) of
{mixed, {OAlloc, _OActivity}} ->
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [disk]),
- {ok,
- State #state { mixed_queues = dict:erase(Pid, Mixed),
- available_tokens = Avail + OAlloc,
- disk_mode_pins =
- sets:add_element(Pid, Pins)
- }};
+ Mixed1 = send_to_disk(Callbacks, Mixed, Pid),
+ {ok, State #state { mixed_queues = Mixed1,
+ available_tokens = Avail + OAlloc,
+ disk_mode_pins =
+ sets:add_element(Pid, Pins)
+ }};
disk ->
- {ok,
- State #state { disk_mode_pins =
- sets:add_element(Pid, Pins) }}
+ {ok, State #state { disk_mode_pins =
+ sets:add_element(Pid, Pins) }}
end
end,
{reply, Res, State1};
-handle_call({unpin_to_disk, Pid}, _From,
+handle_call({unpin_from_disk, Pid}, _From,
State = #state { disk_mode_pins = Pins }) ->
{reply, ok, State #state { disk_mode_pins = sets:del_element(Pid, Pins) }};
@@ -207,13 +219,15 @@ handle_call(info, _From, State) ->
mixed_queues = Mixed,
lowrate = Lazy,
hibernate = Sleepy,
- disk_mode_pins = Pins } =
+ disk_mode_pins = Pins,
+ unevictable = Unevictable } =
free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying
{reply, [{ available_tokens, Avail },
{ mixed_queues, dict:to_list(Mixed) },
{ lowrate_queues, priority_queue:to_list(Lazy) },
{ hibernated_queues, queue:to_list(Sleepy) },
- { queues_pinned_to_disk, sets:to_list(Pins) }], State1}.
+ { queues_pinned_to_disk, sets:to_list(Pins) },
+ { unevictable_queues, sets:to_list(Unevictable) }], State1}.
handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
@@ -229,31 +243,28 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
{G, L} -> G < ?ACTIVITY_THRESHOLD andalso
L < ?ACTIVITY_THRESHOLD
end,
+ MixedActivity = if Hibernating -> hibernate;
+ LowRate -> lowrate;
+ true -> active
+ end,
{StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} =
case find_queue(Pid, Mixed) of
{mixed, {OAlloc, _OActivity}} ->
Avail1 = Avail + OAlloc,
- State1 = #state { available_tokens = Avail2,
- mixed_queues = Mixed1 } =
- free_upto(Pid, Req,
- State #state { available_tokens = Avail1 }),
+ State1 =
+ #state { available_tokens = Avail2, mixed_queues = Mixed1 }
+ = free_upto(Pid, Req,
+ State #state { available_tokens = Avail1 }),
case Req > Avail2 of
true -> %% nowt we can do, send to disk
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [disk]),
- {State1 #state { mixed_queues =
- dict:erase(Pid, Mixed1) },
- disk};
+ Mixed2 = send_to_disk(Callbacks, Mixed1, Pid),
+ {State1 #state { mixed_queues = Mixed2 }, disk};
false -> %% keep mixed
- Activity = if Hibernating -> hibernate;
- LowRate -> lowrate;
- true -> active
- end,
{State1 #state
{ mixed_queues =
- dict:store(Pid, {Req, Activity}, Mixed1),
+ dict:store(Pid, {Req, MixedActivity}, Mixed1),
available_tokens = Avail2 - Req },
- Activity}
+ MixedActivity}
end;
disk ->
case sets:is_element(Pid, Pins) of
@@ -273,15 +284,11 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
dict:fetch(Pid, Callbacks),
ok = erlang:apply(Module, Function,
Args ++ [mixed]),
- Activity = if Hibernating -> hibernate;
- LowRate -> lowrate;
- true -> active
- end,
{State1 #state {
mixed_queues =
- dict:store(Pid, {Req, Activity}, Mixed1),
+ dict:store(Pid, {Req, MixedActivity}, Mixed1),
available_tokens = Avail1 - Req },
- disk}
+ MixedActivity}
end
end
end,
@@ -296,11 +303,18 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating},
end,
{noreply, StateN1};
-handle_cast({register, Pid, Module, Function, Args},
- State = #state { callbacks = Callbacks }) ->
+handle_cast({register, Pid, IsUnevictable, Module, Function, Args},
+ State = #state { callbacks = Callbacks,
+ unevictable = Unevictable }) ->
_MRef = erlang:monitor(process, Pid),
+ Unevictable1 = case IsUnevictable of
+ true -> sets:add_element(Pid, Unevictable);
+ false -> Unevictable
+ end,
{noreply, State #state { callbacks = dict:store
- (Pid, {Module, Function, Args}, Callbacks) }}.
+ (Pid, {Module, Function, Args}, Callbacks),
+ unevictable = Unevictable1
+ }}.
handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state { available_tokens = Avail,
@@ -325,7 +339,7 @@ code_change(_OldVsn, State, _Extra) ->
{ok, State}.
add_to_lowrate(Pid, Alloc, Lazy) ->
- Bucket = trunc(math:log(Alloc)),
+ Bucket = trunc(math:log(Alloc)), %% log base e
priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy).
find_queue(Pid, Mixed) ->
@@ -334,97 +348,95 @@ find_queue(Pid, Mixed) ->
error -> disk
end.
-tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) ->
- tidy_and_sum(IgnorePid, Mixed,
+send_to_disk(Callbacks, Mixed, Pid) ->
+ {Module, Function, Args} = dict:fetch(Pid, Callbacks),
+ ok = erlang:apply(Module, Function, Args ++ [disk]),
+ dict:erase(Pid, Mixed).
+
+tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) ->
+ tidy_and_sum(lowrate, Mixed,
fun (Lazy1) ->
case priority_queue:out(Lazy1) of
- {empty, Lazy1} ->
- {empty, Lazy1};
+ {empty, Lazy2} ->
+ {empty, Lazy2};
{{value, {Pid, _Bucket, _Alloc}}, Lazy2} ->
{{value, Pid}, Lazy2}
end
- end, fun add_to_lowrate/3, Lazy, priority_queue:new(),
- lowrate).
+ end, fun add_to_lowrate/3, IgnorePids, Lazy,
+ priority_queue:new(), 0).
-tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) ->
- tidy_and_sum(IgnorePid, Mixed, fun queue:out/1,
- fun (Pid, _Alloc, Queue) ->
- queue:in(Pid, Queue)
- end, Sleepy, queue:new(), hibernate).
-
-tidy_and_sum(IgnorePid, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit,
- AtomExpected) ->
- tidy_and_sum(sets:add_element(IgnorePid, sets:new()),
- Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, 0,
- AtomExpected).
-
-tidy_and_sum(DupCheckSet, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit,
- AllocAcc, AtomExpected) ->
+tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) ->
+ tidy_and_sum(hibernate, Mixed, fun queue:out/1,
+ fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
+ IgnorePids, Sleepy, queue:new(), 0).
+
+tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet,
+ CataInit, AnaInit, AllocAcc) ->
case Catamorphism(CataInit) of
- {empty, CataInit} -> {AnaInit, AllocAcc};
+ {empty, _CataInit} -> {AnaInit, AllocAcc};
{{value, Pid}, CataInit1} ->
- {DupCheckSet2, AnaInit2, AllocAcc2} =
+ {DupCheckSet1, AnaInit1, AllocAcc1} =
case sets:is_element(Pid, DupCheckSet) of
true ->
{DupCheckSet, AnaInit, AllocAcc};
false ->
- {AnaInit1, AllocAcc1} =
- case find_queue(Pid, Mixed) of
- {mixed, {Alloc, AtomExpected}} ->
- {Anamorphism(Pid, Alloc, AnaInit),
- Alloc + AllocAcc};
- _ ->
- {AnaInit, AllocAcc}
- end,
- {sets:add_element(Pid, DupCheckSet), AnaInit1,
- AllocAcc1}
+ case find_queue(Pid, Mixed) of
+ {mixed, {Alloc, AtomExpected}} ->
+ {sets:add_element(Pid, DupCheckSet),
+ Anamorphism(Pid, Alloc, AnaInit),
+ Alloc + AllocAcc};
+ _ ->
+ {DupCheckSet, AnaInit, AllocAcc}
+ end
end,
- tidy_and_sum(DupCheckSet2, Mixed, Catamorphism, Anamorphism,
- CataInit1, AnaInit2, AllocAcc2, AtomExpected)
+ tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism,
+ DupCheckSet1, CataInit1, AnaInit1, AllocAcc1)
end.
-free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) ->
- free_from(Callbacks, Mixed,
- fun(Lazy1, LazyAcc) ->
- case priority_queue:out(Lazy1) of
- {empty, Lazy1} ->
- empty;
- {{value, {IgnorePid, Bucket, Alloc}}, Lazy2} ->
- {skip, Lazy2,
- priority_queue:in({IgnorePid, Bucket, Alloc},
- Bucket, LazyAcc)};
- {{value, {Pid, _Bucket, Alloc}}, Lazy3} ->
- {value, Lazy3, Pid, Alloc}
+free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) ->
+ free_from(
+ Callbacks,
+ fun(_Mixed, Lazy1, LazyAcc) ->
+ case priority_queue:out(Lazy1) of
+ {empty, _Lazy2} ->
+ empty;
+ {{value, V = {Pid, Bucket, Alloc}}, Lazy2} ->
+ case sets:is_element(Pid, IgnorePids) of
+ true -> {skip, Lazy2,
+ priority_queue:in(V, Bucket, LazyAcc)};
+ false -> {value, Lazy2, Pid, Alloc}
end
- end, fun priority_queue:join/2, Lazy, priority_queue:new(), Req).
+ end
+ end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req).
-free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) ->
- free_from(Callbacks, Mixed,
- fun(Sleepy1, SleepyAcc) ->
+free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) ->
+ free_from(Callbacks,
+ fun(Mixed1, Sleepy1, SleepyAcc) ->
case queue:out(Sleepy1) of
- {empty, Sleepy1} ->
+ {empty, _Sleepy2} ->
empty;
- {{value, IgnorePid}, Sleepy2} ->
- {skip, Sleepy2, queue:in(IgnorePid, SleepyAcc)};
- {{value, Pid}, Sleepy3} ->
- {Alloc, hibernate} = dict:fetch(Pid, Mixed),
- {value, Sleepy3, Pid, Alloc}
+ {{value, Pid}, Sleepy2} ->
+ case sets:is_element(Pid, IgnorePids) of
+ true -> {skip, Sleepy2,
+ queue:in(Pid, SleepyAcc)};
+ false -> {Alloc, hibernate} =
+ dict:fetch(Pid, Mixed1),
+ {value, Sleepy2, Pid, Alloc}
+ end
end
- end, fun queue:join/2, Sleepy, queue:new(), Req).
+ end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req).
-free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit, AnaInit, Req) ->
- case Hylomorphism(CataInit, AnaInit) of
+free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) ->
+ case Hylomorphism(Mixed, CataInit, AnaInit) of
empty ->
- {BaseCase(CataInit, AnaInit), Mixed, Req};
+ {AnaInit, Mixed, Req};
{skip, CataInit1, AnaInit1} ->
- free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit1,
+ free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1,
AnaInit1, Req);
{value, CataInit1, Pid, Alloc} ->
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [disk]),
- Mixed1 = dict:erase(Pid, Mixed),
+ Mixed1 = send_to_disk(Callbacks, Mixed, Pid),
case Req > Alloc of
- true -> free_from(Callbacks, Mixed1, Hylomorphism, BaseCase,
+ true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1,
CataInit1, AnaInit, Req - Alloc);
false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc}
end
@@ -434,36 +446,36 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail,
mixed_queues = Mixed,
callbacks = Callbacks,
lowrate = Lazy,
- hibernate = Sleepy }) ->
- case Req > Avail of
- true ->
- {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed),
- case Req > Avail + SleepySum of
- true -> %% not enough in sleepy, have a look in lazy too
- {Lazy1, LazySum} = tidy_and_sum_lazy(Pid, Lazy, Mixed),
- case Req > Avail + SleepySum + LazySum of
- true -> %% can't free enough, just return tidied state
- State #state { lowrate = Lazy1,
- hibernate = Sleepy1 };
- false -> %% need to free all of sleepy, and some of lazy
- {Sleepy2, Mixed1, ReqRem} =
- free_upto_sleepy
- (Pid, Callbacks, Sleepy1, Mixed, Req),
- {Lazy2, Mixed2, ReqRem1} =
- free_upto_lazy(Pid, Callbacks, Lazy1, Mixed1,
- ReqRem),
- State #state { available_tokens =
- Avail + (Req - ReqRem1),
- mixed_queues = Mixed2,
- lowrate = Lazy2,
- hibernate = Sleepy2 }
- end;
- false -> %% enough available in sleepy, don't touch lazy
+ hibernate = Sleepy,
+ unevictable = Unevictable })
+ when Req > Avail ->
+ Unevictable1 = sets:add_element(Pid, Unevictable),
+ {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed),
+ case Req > Avail + SleepySum of
+ true -> %% not enough in sleepy, have a look in lazy too
+ {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed),
+ case Req > Avail + SleepySum + LazySum of
+ true -> %% can't free enough, just return tidied state
+ State #state { lowrate = Lazy1, hibernate = Sleepy1 };
+ false -> %% need to free all of sleepy, and some of lazy
{Sleepy2, Mixed1, ReqRem} =
- free_upto_sleepy(Pid, Callbacks, Sleepy1, Mixed, Req),
- State #state { available_tokens = Avail + (Req - ReqRem),
- mixed_queues = Mixed1,
+ free_upto_sleepy(Unevictable1, Callbacks,
+ Sleepy1, Mixed, Req),
+ {Lazy2, Mixed2, ReqRem1} =
+ free_upto_lazy(Unevictable1, Callbacks,
+ Lazy1, Mixed1, ReqRem),
+ %% ReqRem1 will be <= 0 because it's
+ %% likely we'll have freed more than we
+ %% need, thus Req - ReqRem1 is total freed
+ State #state { available_tokens = Avail + (Req - ReqRem1),
+ mixed_queues = Mixed2, lowrate = Lazy2,
hibernate = Sleepy2 }
end;
- false -> State
- end.
+ false -> %% enough available in sleepy, don't touch lazy
+ {Sleepy2, Mixed1, ReqRem} =
+ free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req),
+ State #state { available_tokens = Avail + (Req - ReqRem),
+ mixed_queues = Mixed1, hibernate = Sleepy2 }
+ end;
+free_upto(_Pid, _Req, State) ->
+ State.