diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_disk_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_mode_manager.erl | 326 |
4 files changed, 172 insertions, 160 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9b77949dea..f0e8d4c2f4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -232,7 +232,7 @@ set_mode_pin(VHostPath, Queue, DiskBin) fun(Q) -> case Disk of true -> rabbit_queue_mode_manager:pin_to_disk (Q #amqqueue.pid); - false -> rabbit_queue_mode_manager:unpin_to_disk + false -> rabbit_queue_mode_manager:unpin_from_disk (Q #amqqueue.pid) end end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index dba7ec2406..8ee576f7c0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,7 +102,7 @@ start_link(Q) -> init(Q = #amqqueue { name = QName, durable = Durable }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), ok = rabbit_queue_mode_manager:register - (self(), rabbit_amqqueue, set_mode, [self()]), + (self(), false, rabbit_amqqueue, set_mode, [self()]), {ok, MS} = rabbit_mixed_queue:init(QName, Durable), State = #q{q = Q, owner = none, diff --git a/src/rabbit_disk_queue.erl b/src/rabbit_disk_queue.erl index 2f8310581c..76399022c5 100644 --- a/src/rabbit_disk_queue.erl +++ b/src/rabbit_disk_queue.erl @@ -379,7 +379,7 @@ init([FileSizeLimit, ReadFileHandlesLimit]) -> %% Otherwise, the gen_server will be immediately terminated. process_flag(trap_exit, true), ok = rabbit_queue_mode_manager:register - (self(), rabbit_disk_queue, set_mode, []), + (self(), true, rabbit_disk_queue, set_mode, []), Node = node(), ok = case mnesia:change_table_copy_type(rabbit_disk_queue, Node, diff --git a/src/rabbit_queue_mode_manager.erl b/src/rabbit_queue_mode_manager.erl index 194ddf9570..37afdc6c82 100644 --- a/src/rabbit_queue_mode_manager.erl +++ b/src/rabbit_queue_mode_manager.erl @@ -38,8 +38,8 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/4, report_memory/3, report_memory/5, info/0, - pin_to_disk/1, unpin_to_disk/1]). +-export([register/5, report_memory/3, report_memory/5, info/0, + pin_to_disk/1, unpin_from_disk/1]). -define(TOTAL_TOKENS, 10000000). -define(ACTIVITY_THRESHOLD, 25). @@ -50,14 +50,14 @@ -spec(start_link/0 :: () -> ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(register/4 :: (pid(), atom(), atom(), list()) -> 'ok'). +-spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok'). -spec(report_memory/3 :: (pid(), non_neg_integer(), bool()) -> 'ok'). -spec(report_memory/5 :: (pid(), non_neg_integer(), (non_neg_integer() | 'undefined'), (non_neg_integer() | 'undefined'), bool()) -> 'ok'). -spec(pin_to_disk/1 :: (pid()) -> 'ok'). --spec(unpin_to_disk/1 :: (pid()) -> 'ok'). +-spec(unpin_from_disk/1 :: (pid()) -> 'ok'). -endif. @@ -67,7 +67,8 @@ tokens_per_byte, lowrate, hibernate, - disk_mode_pins + disk_mode_pins, + unevictable }). %% Token-credit based memory management @@ -91,24 +92,36 @@ %% queue, and so is implicitly sorted by the order in which processes %% were added to the queue. This means that when removing from the %% queue, we hibernate the sleepiest pid first. The lowrate group is a -%% priority queue, where the priority is the amount of memory -%% allocated. Thus when we remove from the queue, we first remove the -%% queue with the most amount of memory. +%% priority queue, where the priority is the truncated log (base e) of +%% the amount of memory allocated. Thus when we remove from the queue, +%% we first remove the queue from the highest bucket. %% %% If the request still can't be satisfied after evicting to disk %% everyone from those two groups (and note that we check first %% whether or not freeing them would make available enough tokens to %% satisfy the request rather than just sending all those queues to -%% disk and then going "whoops, didn't help afterall"), then we send -%% the requesting process to disk. +%% disk and then going "whoops, didn't help after all"), then we send +%% the requesting process to disk. When a queue registers, it can +%% declare itself "unevictable". If a queue is unevictable then it +%% will not be sent to disk as a result of other processes requesting +%% more memory. However, if it itself is requesting more memory and +%% that request can't be satisfied then it is still sent to disk as +%% before. This feature is only used by the disk_queue, because if the +%% disk queue is not being used, and hibernates, and then memory +%% pressure gets tight, the disk_queue would typically be one of the +%% first processes to get sent to disk, which cripples +%% performance. Thus by setting it unevictable, it is only possible +%% for the disk_queue to be sent to disk when it is active and +%% attempting to increase its memory allocation. %% %% If a process has been sent to disk, it continues making %% requests. As soon as a request can be satisfied (and this can %% include sending other processes to disk in the way described -%% above), it will be told to come back into mixed mode. +%% above), it will be told to come back into mixed mode. We do not +%% keep any information about queues in disk mode. %% %% Note that the lowrate and hibernate groups can get very out of -%% date. This is fine, and kinda unavoidable given the absence of +%% date. This is fine, and somewhat unavoidable given the absence of %% useful APIs for queues. Thus we allow them to get out of date %% (processes will be left in there when they change groups, %% duplicates can appear, dead processes are not pruned etc etc etc), @@ -116,17 +129,17 @@ %% memory, we tidy up at that point. %% %% A process which is not evicted to disk, and is requesting a smaller -%% amount of ram than its last request will always be satisfied. A +%% amount of RAM than its last request will always be satisfied. A %% mixed-mode process that is busy but consuming an unchanging amount %% of RAM will never be sent to disk. The disk_queue is also managed %% in the same way. This means that a queue that has gone back to %% being mixed after being in disk mode now has its messages counted %% twice as they are counted both in the request made by the queue -%% (even though they may not yet be in RAM) and also by the -%% disk_queue. This means that the threshold for going mixed -> disk -%% is above the threshold for going disk -> mixed. This is actually -%% fairly sensible as it reduces the risk of any oscillations -%% occurring. +%% (even though they may not yet be in RAM (though see the +%% prefetcher)) and also by the disk_queue. Thus the amount of +%% available RAM must be higher when going disk -> mixed than when +%% going mixed -> disk. This is fairly sensible as it reduces the risk +%% of any oscillations occurring. %% %% The queue process deliberately reports 4 times its estimated RAM %% usage, and the disk_queue 2.5 times. In practise, this seems to @@ -138,14 +151,15 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). -register(Pid, Module, Function, Args) -> - gen_server2:cast(?SERVER, {register, Pid, Module, Function, Args}). +register(Pid, Unevictable, Module, Function, Args) -> + gen_server2:cast(?SERVER, {register, Pid, Unevictable, + Module, Function, Args}). pin_to_disk(Pid) -> gen_server2:call(?SERVER, {pin_to_disk, Pid}). -unpin_to_disk(Pid) -> - gen_server2:call(?SERVER, {unpin_to_disk, Pid}). +unpin_from_disk(Pid) -> + gen_server2:call(?SERVER, {unpin_from_disk, Pid}). report_memory(Pid, Memory, Hibernating) -> report_memory(Pid, Memory, undefined, undefined, Hibernating). @@ -168,7 +182,8 @@ init([]) -> tokens_per_byte = ?TOTAL_TOKENS / MemAvail, lowrate = priority_queue:new(), hibernate = queue:new(), - disk_mode_pins = sets:new() + disk_mode_pins = sets:new(), + unevictable = sets:new() }}. handle_call({pin_to_disk, Pid}, _From, @@ -182,23 +197,20 @@ handle_call({pin_to_disk, Pid}, _From, false -> case find_queue(Pid, Mixed) of {mixed, {OAlloc, _OActivity}} -> - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [disk]), - {ok, - State #state { mixed_queues = dict:erase(Pid, Mixed), - available_tokens = Avail + OAlloc, - disk_mode_pins = - sets:add_element(Pid, Pins) - }}; + Mixed1 = send_to_disk(Callbacks, Mixed, Pid), + {ok, State #state { mixed_queues = Mixed1, + available_tokens = Avail + OAlloc, + disk_mode_pins = + sets:add_element(Pid, Pins) + }}; disk -> - {ok, - State #state { disk_mode_pins = - sets:add_element(Pid, Pins) }} + {ok, State #state { disk_mode_pins = + sets:add_element(Pid, Pins) }} end end, {reply, Res, State1}; -handle_call({unpin_to_disk, Pid}, _From, +handle_call({unpin_from_disk, Pid}, _From, State = #state { disk_mode_pins = Pins }) -> {reply, ok, State #state { disk_mode_pins = sets:del_element(Pid, Pins) }}; @@ -207,13 +219,15 @@ handle_call(info, _From, State) -> mixed_queues = Mixed, lowrate = Lazy, hibernate = Sleepy, - disk_mode_pins = Pins } = + disk_mode_pins = Pins, + unevictable = Unevictable } = free_upto(undef, 1 + ?TOTAL_TOKENS, State), %% this'll just do tidying {reply, [{ available_tokens, Avail }, { mixed_queues, dict:to_list(Mixed) }, { lowrate_queues, priority_queue:to_list(Lazy) }, { hibernated_queues, queue:to_list(Sleepy) }, - { queues_pinned_to_disk, sets:to_list(Pins) }], State1}. + { queues_pinned_to_disk, sets:to_list(Pins) }, + { unevictable_queues, sets:to_list(Unevictable) }], State1}. handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, @@ -229,31 +243,28 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, {G, L} -> G < ?ACTIVITY_THRESHOLD andalso L < ?ACTIVITY_THRESHOLD end, + MixedActivity = if Hibernating -> hibernate; + LowRate -> lowrate; + true -> active + end, {StateN = #state { lowrate = Lazy, hibernate = Sleepy }, ActivityNew} = case find_queue(Pid, Mixed) of {mixed, {OAlloc, _OActivity}} -> Avail1 = Avail + OAlloc, - State1 = #state { available_tokens = Avail2, - mixed_queues = Mixed1 } = - free_upto(Pid, Req, - State #state { available_tokens = Avail1 }), + State1 = + #state { available_tokens = Avail2, mixed_queues = Mixed1 } + = free_upto(Pid, Req, + State #state { available_tokens = Avail1 }), case Req > Avail2 of true -> %% nowt we can do, send to disk - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [disk]), - {State1 #state { mixed_queues = - dict:erase(Pid, Mixed1) }, - disk}; + Mixed2 = send_to_disk(Callbacks, Mixed1, Pid), + {State1 #state { mixed_queues = Mixed2 }, disk}; false -> %% keep mixed - Activity = if Hibernating -> hibernate; - LowRate -> lowrate; - true -> active - end, {State1 #state { mixed_queues = - dict:store(Pid, {Req, Activity}, Mixed1), + dict:store(Pid, {Req, MixedActivity}, Mixed1), available_tokens = Avail2 - Req }, - Activity} + MixedActivity} end; disk -> case sets:is_element(Pid, Pins) of @@ -273,15 +284,11 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, dict:fetch(Pid, Callbacks), ok = erlang:apply(Module, Function, Args ++ [mixed]), - Activity = if Hibernating -> hibernate; - LowRate -> lowrate; - true -> active - end, {State1 #state { mixed_queues = - dict:store(Pid, {Req, Activity}, Mixed1), + dict:store(Pid, {Req, MixedActivity}, Mixed1), available_tokens = Avail1 - Req }, - disk} + MixedActivity} end end end, @@ -296,11 +303,18 @@ handle_cast({report_memory, Pid, Memory, BytesGained, BytesLost, Hibernating}, end, {noreply, StateN1}; -handle_cast({register, Pid, Module, Function, Args}, - State = #state { callbacks = Callbacks }) -> +handle_cast({register, Pid, IsUnevictable, Module, Function, Args}, + State = #state { callbacks = Callbacks, + unevictable = Unevictable }) -> _MRef = erlang:monitor(process, Pid), + Unevictable1 = case IsUnevictable of + true -> sets:add_element(Pid, Unevictable); + false -> Unevictable + end, {noreply, State #state { callbacks = dict:store - (Pid, {Module, Function, Args}, Callbacks) }}. + (Pid, {Module, Function, Args}, Callbacks), + unevictable = Unevictable1 + }}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state { available_tokens = Avail, @@ -325,7 +339,7 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. add_to_lowrate(Pid, Alloc, Lazy) -> - Bucket = trunc(math:log(Alloc)), + Bucket = trunc(math:log(Alloc)), %% log base e priority_queue:in({Pid, Bucket, Alloc}, Bucket, Lazy). find_queue(Pid, Mixed) -> @@ -334,97 +348,95 @@ find_queue(Pid, Mixed) -> error -> disk end. -tidy_and_sum_lazy(IgnorePid, Lazy, Mixed) -> - tidy_and_sum(IgnorePid, Mixed, +send_to_disk(Callbacks, Mixed, Pid) -> + {Module, Function, Args} = dict:fetch(Pid, Callbacks), + ok = erlang:apply(Module, Function, Args ++ [disk]), + dict:erase(Pid, Mixed). + +tidy_and_sum_lazy(IgnorePids, Lazy, Mixed) -> + tidy_and_sum(lowrate, Mixed, fun (Lazy1) -> case priority_queue:out(Lazy1) of - {empty, Lazy1} -> - {empty, Lazy1}; + {empty, Lazy2} -> + {empty, Lazy2}; {{value, {Pid, _Bucket, _Alloc}}, Lazy2} -> {{value, Pid}, Lazy2} end - end, fun add_to_lowrate/3, Lazy, priority_queue:new(), - lowrate). + end, fun add_to_lowrate/3, IgnorePids, Lazy, + priority_queue:new(), 0). -tidy_and_sum_sleepy(IgnorePid, Sleepy, Mixed) -> - tidy_and_sum(IgnorePid, Mixed, fun queue:out/1, - fun (Pid, _Alloc, Queue) -> - queue:in(Pid, Queue) - end, Sleepy, queue:new(), hibernate). - -tidy_and_sum(IgnorePid, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, - AtomExpected) -> - tidy_and_sum(sets:add_element(IgnorePid, sets:new()), - Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, 0, - AtomExpected). - -tidy_and_sum(DupCheckSet, Mixed, Catamorphism, Anamorphism, CataInit, AnaInit, - AllocAcc, AtomExpected) -> +tidy_and_sum_sleepy(IgnorePids, Sleepy, Mixed) -> + tidy_and_sum(hibernate, Mixed, fun queue:out/1, + fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, + IgnorePids, Sleepy, queue:new(), 0). + +tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, DupCheckSet, + CataInit, AnaInit, AllocAcc) -> case Catamorphism(CataInit) of - {empty, CataInit} -> {AnaInit, AllocAcc}; + {empty, _CataInit} -> {AnaInit, AllocAcc}; {{value, Pid}, CataInit1} -> - {DupCheckSet2, AnaInit2, AllocAcc2} = + {DupCheckSet1, AnaInit1, AllocAcc1} = case sets:is_element(Pid, DupCheckSet) of true -> {DupCheckSet, AnaInit, AllocAcc}; false -> - {AnaInit1, AllocAcc1} = - case find_queue(Pid, Mixed) of - {mixed, {Alloc, AtomExpected}} -> - {Anamorphism(Pid, Alloc, AnaInit), - Alloc + AllocAcc}; - _ -> - {AnaInit, AllocAcc} - end, - {sets:add_element(Pid, DupCheckSet), AnaInit1, - AllocAcc1} + case find_queue(Pid, Mixed) of + {mixed, {Alloc, AtomExpected}} -> + {sets:add_element(Pid, DupCheckSet), + Anamorphism(Pid, Alloc, AnaInit), + Alloc + AllocAcc}; + _ -> + {DupCheckSet, AnaInit, AllocAcc} + end end, - tidy_and_sum(DupCheckSet2, Mixed, Catamorphism, Anamorphism, - CataInit1, AnaInit2, AllocAcc2, AtomExpected) + tidy_and_sum(AtomExpected, Mixed, Catamorphism, Anamorphism, + DupCheckSet1, CataInit1, AnaInit1, AllocAcc1) end. -free_upto_lazy(IgnorePid, Callbacks, Lazy, Mixed, Req) -> - free_from(Callbacks, Mixed, - fun(Lazy1, LazyAcc) -> - case priority_queue:out(Lazy1) of - {empty, Lazy1} -> - empty; - {{value, {IgnorePid, Bucket, Alloc}}, Lazy2} -> - {skip, Lazy2, - priority_queue:in({IgnorePid, Bucket, Alloc}, - Bucket, LazyAcc)}; - {{value, {Pid, _Bucket, Alloc}}, Lazy3} -> - {value, Lazy3, Pid, Alloc} +free_upto_lazy(IgnorePids, Callbacks, Lazy, Mixed, Req) -> + free_from( + Callbacks, + fun(_Mixed, Lazy1, LazyAcc) -> + case priority_queue:out(Lazy1) of + {empty, _Lazy2} -> + empty; + {{value, V = {Pid, Bucket, Alloc}}, Lazy2} -> + case sets:is_element(Pid, IgnorePids) of + true -> {skip, Lazy2, + priority_queue:in(V, Bucket, LazyAcc)}; + false -> {value, Lazy2, Pid, Alloc} end - end, fun priority_queue:join/2, Lazy, priority_queue:new(), Req). + end + end, fun priority_queue:join/2, Mixed, Lazy, priority_queue:new(), Req). -free_upto_sleepy(IgnorePid, Callbacks, Sleepy, Mixed, Req) -> - free_from(Callbacks, Mixed, - fun(Sleepy1, SleepyAcc) -> +free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Mixed, Req) -> + free_from(Callbacks, + fun(Mixed1, Sleepy1, SleepyAcc) -> case queue:out(Sleepy1) of - {empty, Sleepy1} -> + {empty, _Sleepy2} -> empty; - {{value, IgnorePid}, Sleepy2} -> - {skip, Sleepy2, queue:in(IgnorePid, SleepyAcc)}; - {{value, Pid}, Sleepy3} -> - {Alloc, hibernate} = dict:fetch(Pid, Mixed), - {value, Sleepy3, Pid, Alloc} + {{value, Pid}, Sleepy2} -> + case sets:is_element(Pid, IgnorePids) of + true -> {skip, Sleepy2, + queue:in(Pid, SleepyAcc)}; + false -> {Alloc, hibernate} = + dict:fetch(Pid, Mixed1), + {value, Sleepy2, Pid, Alloc} + end end - end, fun queue:join/2, Sleepy, queue:new(), Req). + end, fun queue:join/2, Mixed, Sleepy, queue:new(), Req). -free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit, AnaInit, Req) -> - case Hylomorphism(CataInit, AnaInit) of +free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit, AnaInit, Req) -> + case Hylomorphism(Mixed, CataInit, AnaInit) of empty -> - {BaseCase(CataInit, AnaInit), Mixed, Req}; + {AnaInit, Mixed, Req}; {skip, CataInit1, AnaInit1} -> - free_from(Callbacks, Mixed, Hylomorphism, BaseCase, CataInit1, + free_from(Callbacks, Hylomorphism, BaseCase, Mixed, CataInit1, AnaInit1, Req); {value, CataInit1, Pid, Alloc} -> - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [disk]), - Mixed1 = dict:erase(Pid, Mixed), + Mixed1 = send_to_disk(Callbacks, Mixed, Pid), case Req > Alloc of - true -> free_from(Callbacks, Mixed1, Hylomorphism, BaseCase, + true -> free_from(Callbacks, Hylomorphism, BaseCase, Mixed1, CataInit1, AnaInit, Req - Alloc); false -> {BaseCase(CataInit1, AnaInit), Mixed1, Req - Alloc} end @@ -434,36 +446,36 @@ free_upto(Pid, Req, State = #state { available_tokens = Avail, mixed_queues = Mixed, callbacks = Callbacks, lowrate = Lazy, - hibernate = Sleepy }) -> - case Req > Avail of - true -> - {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Pid, Sleepy, Mixed), - case Req > Avail + SleepySum of - true -> %% not enough in sleepy, have a look in lazy too - {Lazy1, LazySum} = tidy_and_sum_lazy(Pid, Lazy, Mixed), - case Req > Avail + SleepySum + LazySum of - true -> %% can't free enough, just return tidied state - State #state { lowrate = Lazy1, - hibernate = Sleepy1 }; - false -> %% need to free all of sleepy, and some of lazy - {Sleepy2, Mixed1, ReqRem} = - free_upto_sleepy - (Pid, Callbacks, Sleepy1, Mixed, Req), - {Lazy2, Mixed2, ReqRem1} = - free_upto_lazy(Pid, Callbacks, Lazy1, Mixed1, - ReqRem), - State #state { available_tokens = - Avail + (Req - ReqRem1), - mixed_queues = Mixed2, - lowrate = Lazy2, - hibernate = Sleepy2 } - end; - false -> %% enough available in sleepy, don't touch lazy + hibernate = Sleepy, + unevictable = Unevictable }) + when Req > Avail -> + Unevictable1 = sets:add_element(Pid, Unevictable), + {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unevictable1, Sleepy, Mixed), + case Req > Avail + SleepySum of + true -> %% not enough in sleepy, have a look in lazy too + {Lazy1, LazySum} = tidy_and_sum_lazy(Unevictable1, Lazy, Mixed), + case Req > Avail + SleepySum + LazySum of + true -> %% can't free enough, just return tidied state + State #state { lowrate = Lazy1, hibernate = Sleepy1 }; + false -> %% need to free all of sleepy, and some of lazy {Sleepy2, Mixed1, ReqRem} = - free_upto_sleepy(Pid, Callbacks, Sleepy1, Mixed, Req), - State #state { available_tokens = Avail + (Req - ReqRem), - mixed_queues = Mixed1, + free_upto_sleepy(Unevictable1, Callbacks, + Sleepy1, Mixed, Req), + {Lazy2, Mixed2, ReqRem1} = + free_upto_lazy(Unevictable1, Callbacks, + Lazy1, Mixed1, ReqRem), + %% ReqRem1 will be <= 0 because it's + %% likely we'll have freed more than we + %% need, thus Req - ReqRem1 is total freed + State #state { available_tokens = Avail + (Req - ReqRem1), + mixed_queues = Mixed2, lowrate = Lazy2, hibernate = Sleepy2 } end; - false -> State - end. + false -> %% enough available in sleepy, don't touch lazy + {Sleepy2, Mixed1, ReqRem} = + free_upto_sleepy(Unevictable1, Callbacks, Sleepy1, Mixed, Req), + State #state { available_tokens = Avail + (Req - ReqRem), + mixed_queues = Mixed1, hibernate = Sleepy2 } + end; +free_upto(_Pid, _Req, State) -> + State. |
