diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-11 17:56:38 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-11 17:56:38 +0000 |
| commit | 52d6b72b14a50577dc7bceb5c846ed525c294e03 (patch) | |
| tree | 4d9e4978291d001220796078712111e05b3b2690 | |
| parent | 560c3ba48999ef174386ca91c2381f6af94f6f6e (diff) | |
| download | rabbitmq-server-git-52d6b72b14a50577dc7bceb5c846ed525c294e03.tar.gz | |
Wired it all together. It does seem to work, but there seems to be an off by one bug somewhere in vq, wrt γ length counting. Which pops up sometimes... Joy.
| -rw-r--r-- | src/rabbit_amqqueue.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 28 | ||||
| -rw-r--r-- | src/rabbit_memory_manager.erl | 404 | ||||
| -rw-r--r-- | src/rabbit_memory_monitor.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
5 files changed, 63 insertions, 429 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 82a0f5b4fa..f7b39c7782 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -33,7 +33,8 @@ -export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3, purge/1]). --export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1]). +-export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1, + set_queue_duration/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). @@ -114,6 +115,7 @@ -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(remeasure_egress_rate/1 :: (pid()) -> 'ok'). +-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -373,7 +375,10 @@ internal_delete(QueueName) -> end). remeasure_egress_rate(QPid) -> - gen_server2:pcast(QPid, 8, remeasure_egress_rate). + gen_server2:pcast(QPid, 9, remeasure_egress_rate). + +set_queue_duration(QPid, Duration) -> + gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). prune_queue_childspecs() -> lists:foreach( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index cd70979a1f..f247c0d1d4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -103,8 +103,8 @@ start_link(Q) -> init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), - ok = rabbit_memory_manager:register - (self(), false, rabbit_amqqueue, set_storage_mode, [self()]), + ok = rabbit_memory_monitor:register + (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), VQS = rabbit_variable_queue:init(QName), State = #q{q = Q, owner = none, @@ -872,9 +872,23 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end)); handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) -> + VQS1 = rabbit_variable_queue:remeasure_egress_rate(VQS), + RamDuration = rabbit_variable_queue:ram_duration(VQS1), + DesiredDuration = + rabbit_memory_monitor:report_queue_duration(self(), RamDuration), + VQS2 = rabbit_variable_queue:set_queue_ram_duration_target( + DesiredDuration, VQS1), + io:format("~p Reported ~p and got back ~p~n", [self(), RamDuration, DesiredDuration]), + io:format("~p~n", [rabbit_variable_queue:status(VQS2)]), noreply(State#q{egress_rate_timer_ref = just_measured, - variable_queue_state = - rabbit_variable_queue:remeasure_egress_rate(VQS)}). + variable_queue_state = VQS2}); + +handle_cast({set_queue_duration, Duration}, + State = #q{variable_queue_state = VQS}) -> + VQS1 = rabbit_variable_queue:set_queue_ram_duration_target( + Duration, VQS), + io:format("~p was told to make duration ~p~n", [self(), Duration]), + noreply(State#q{variable_queue_state = VQS1}). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -894,6 +908,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> {ok, NewState} -> noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end; +handle_info({'EXIT', _DownPid, normal}, State) -> + %% because we have trap_exit on, we'll pick up here the prefetcher + %% going down. We probably need to make sure that we really are + %% just picking up the prefetcher here. It's safe to ignore it + %% though, provided 'normal' + noreply(State); handle_info(timeout, State = #q{variable_queue_state = VQS, sync_timer_ref = undefined}) -> diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl deleted file mode 100644 index a73f03e27d..0000000000 --- a/src/rabbit_memory_manager.erl +++ /dev/null @@ -1,404 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2009 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2009 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_memory_manager). - --behaviour(gen_server2). - --export([start_link/0]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([register/5, report_memory/3, info/0, conserve_memory/2]). - --define(TOTAL_TOKENS, 10000000). --define(THRESHOLD_MULTIPLIER, 0.05). --define(THRESHOLD_OFFSET, ?TOTAL_TOKENS * ?THRESHOLD_MULTIPLIER). - --define(SERVER, ?MODULE). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --spec(start_link/0 :: () -> - ({'ok', pid()} | 'ignore' | {'error', any()})). --spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok'). --spec(report_memory/3 :: (pid(), non_neg_integer(), boolean()) -> 'ok'). --spec(info/0 :: () -> [{atom(), any()}]). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). - --endif. - -%%---------------------------------------------------------------------------- - --record(state, { available_tokens, - processes, - callbacks, - tokens_per_byte, - hibernate, - unoppressable, - alarmed - }). - -%% Token-credit based memory management - -%% Start off by working out the amount of memory available in the -%% system (RAM). Then, work out how many tokens each byte corresponds -%% to. This is the tokens_per_byte field. When a process registers, it -%% must provide an M-F-A triple to a function that needs one further -%% argument, which is the new mode. This will either be 'liberated' or -%% 'oppressed'. -%% -%% Processes then report their own memory usage, in bytes, and the -%% manager takes care of the rest. -%% -%% There are a finite number of tokens in the system. These are -%% allocated to processes as the processes report their memory -%% usage. We keep track of processes which have hibernated. When a -%% process reports memory use which can't be satisfied by the -%% available tokens, we try and oppress processes first from the -%% hibernated group. The hibernated group is a simple queue, and so is -%% implicitly sorted by the order in which processes were added to the -%% queue. This means that when removing from the queue, we evict the -%% sleepiest (and most passive) pid first. -%% -%% If the reported memory use still can't be satisfied after -%% oppressing everyone from those two groups (and note that we check -%% first whether or not oppressing them would make available enough -%% tokens to satisfy the reported use rather than just oppressing all -%% those processes and then going "whoops, didn't help after all"), -%% then we oppress the reporting process. When a process registers, it -%% can declare itself "unoppressable". If a process is unoppressable -%% then it will not be oppressed as a result of other processes -%% needing more tokens. However, if it itself needs additional tokens -%% which aren't available then it is still oppressed as before. This -%% feature is only used by the disk_queue, because if the disk queue -%% is not being used, and hibernates, and then memory pressure gets -%% tight, the disk_queue would typically be one of the first processes -%% to be oppressed (sent to disk_only mode), which cripples -%% performance. Thus by setting it unoppressable, it is only possible -%% for the disk_queue to be oppressed when it is active and attempting -%% to increase its memory allocation. -%% -%% If a process has been oppressed, it continues making memory -%% reports, as if it was liberated. As soon as a reported amount of -%% memory can be satisfied (and this can include oppressing other -%% processes in the way described above), *and* the number of -%% available tokens has changed by ?THRESHOLD_MULTIPLIER since the -%% processes was oppressed, it will be liberated. This later condition -%% prevents processes from continually oppressing each other if they -%% themselves can be liberated by oppressing other processes. -%% -%% Note that the hibernate group can get very out of date. This is -%% fine, and somewhat unavoidable given the absence of useful APIs for -%% queues. Thus we allow them to get out of date (processes will be -%% left in there when they change groups, duplicates can appear, dead -%% processes are not pruned etc etc etc), and when we go through the -%% groups, summing up their allocated tokens, we tidy up at that -%% point. -%% -%% A liberated process, which is reporting a smaller amount of RAM -%% than its last report will remain liberated. A liberated process -%% that is busy but consuming an unchanging amount of RAM will never -%% be oppressed. - -%% Specific notes as applied to queues and the disk_queue: -%% -%% The disk_queue is managed in the same way as queues. This means -%% that a queue that has gone back to mixed mode after being in disk -%% mode now has its messages counted twice as they are counted both in -%% the report made by the queue (even though they may not yet be in -%% RAM (though see the prefetcher)) and also by the disk_queue. Thus -%% the amount of available RAM must be higher when going disk -> mixed -%% than when going mixed -> disk. This is fairly sensible as it -%% reduces the risk of any oscillations occurring. -%% -%% The queue process deliberately reports 4 times its estimated RAM -%% usage, and the disk_queue 2.5 times. In practise, this seems to -%% work well. Note that we are deliberately running out of tokes a -%% little early because of the fact that the mixed -> disk transition -%% can transiently eat a lot of memory and take some time (flushing a -%% few million messages to disk is never going to be instantaneous). - -start_link() -> - gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []). - -register(Pid, Unoppressable, Module, Function, Args) -> - gen_server2:cast(?SERVER, {register, Pid, Unoppressable, - Module, Function, Args}). - -report_memory(Pid, Memory, Hibernating) -> - gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Hibernating}). - -info() -> - gen_server2:call(?SERVER, info). - -conserve_memory(_Pid, Conserve) -> - gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}). - -%%---------------------------------------------------------------------------- - -init([]) -> - process_flag(trap_exit, true), - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), - {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(), - MemAvail = MemTotal - MemUsed, - TPB = if MemAvail == 0 -> 0; - true -> ?TOTAL_TOKENS / MemAvail - end, - {ok, #state { available_tokens = ?TOTAL_TOKENS, - processes = dict:new(), - callbacks = dict:new(), - tokens_per_byte = TPB, - hibernate = queue:new(), - unoppressable = sets:new(), - alarmed = false - }}. - -handle_call(info, _From, State) -> - State1 = #state { available_tokens = Avail, - processes = Procs, - hibernate = Sleepy, - unoppressable = Unoppressable } = - free_upto(undefined, 1 + ?TOTAL_TOKENS, State), %% just tidy - {reply, [{ available_tokens, Avail }, - { processes, dict:to_list(Procs) }, - { hibernated_processes, queue:to_list(Sleepy) }, - { unoppressable_processes, sets:to_list(Unoppressable) }], State1}. - -handle_cast({report_memory, Pid, Memory, Hibernating}, - State = #state { processes = Procs, - available_tokens = Avail, - callbacks = Callbacks, - tokens_per_byte = TPB, - alarmed = Alarmed }) -> - Req = rabbit_misc:ceil(TPB * Memory), - LibreActivity = if Hibernating -> hibernate; - true -> active - end, - {StateN = #state { hibernate = Sleepy }, ActivityNew} = - case find_process(Pid, Procs) of - {libre, OAlloc, _OActivity} -> - Avail1 = Avail + OAlloc, - State1 = #state { available_tokens = Avail2, - processes = Procs1 } - = free_upto(Pid, Req, - State #state { available_tokens = Avail1 }), - case Req > Avail2 of - true -> %% nowt we can do, oppress the process - Procs2 = - set_process_mode(Procs1, Callbacks, Pid, oppressed, - {oppressed, Avail2}), - {State1 #state { processes = Procs2 }, oppressed}; - false -> %% keep liberated - {State1 #state - { processes = - dict:store(Pid, {libre, Req, LibreActivity}, Procs1), - available_tokens = Avail2 - Req }, - LibreActivity} - end; - {oppressed, OrigAvail} -> - case Req > 0 andalso - ( Alarmed orelse Hibernating orelse - (Avail > (OrigAvail - ?THRESHOLD_OFFSET) andalso - Avail < (OrigAvail + ?THRESHOLD_OFFSET)) ) of - true -> - {State, oppressed}; - false -> - State1 = #state { available_tokens = Avail1, - processes = Procs1 } = - free_upto(Pid, Req, State), - case Req > Avail1 of - true -> - %% not enough space, so stay oppressed - {State1, oppressed}; - false -> %% can liberate the process - Procs2 = set_process_mode( - Procs1, Callbacks, Pid, liberated, - {libre, Req, LibreActivity}), - {State1 #state { - processes = Procs2, - available_tokens = Avail1 - Req }, - LibreActivity} - end - end - end, - StateN1 = - case ActivityNew of - active -> StateN; - oppressed -> StateN; - hibernate -> - StateN #state { hibernate = queue:in(Pid, Sleepy) } - end, - {noreply, StateN1}; - -handle_cast({register, Pid, IsUnoppressable, Module, Function, Args}, - State = #state { callbacks = Callbacks, - unoppressable = Unoppressable }) -> - _MRef = erlang:monitor(process, Pid), - Unoppressable1 = case IsUnoppressable of - true -> sets:add_element(Pid, Unoppressable); - false -> Unoppressable - end, - {noreply, State #state { callbacks = dict:store - (Pid, {Module, Function, Args}, Callbacks), - unoppressable = Unoppressable1 - }}; - -handle_cast({conserve_memory, Conserve}, State) -> - {noreply, State #state { alarmed = Conserve }}. - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #state { available_tokens = Avail, - processes = Procs, - callbacks = Callbacks }) -> - State1 = State #state { processes = dict:erase(Pid, Procs), - callbacks = dict:erase(Pid, Callbacks) }, - {noreply, case find_process(Pid, Procs) of - {oppressed, _OrigReq} -> - State1; - {libre, Alloc, _Activity} -> - State1 #state { available_tokens = Avail + Alloc } - end}; -handle_info({'EXIT', _Pid, Reason}, State) -> - {stop, Reason, State}; -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State) -> - State. - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%---------------------------------------------------------------------------- - -find_process(Pid, Procs) -> - case dict:find(Pid, Procs) of - {ok, Value} -> Value; - error -> {oppressed, 0} - end. - -set_process_mode(Procs, Callbacks, Pid, Mode, Record) -> - {Module, Function, Args} = dict:fetch(Pid, Callbacks), - ok = erlang:apply(Module, Function, Args ++ [Mode]), - dict:store(Pid, Record, Procs). - -tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) -> - tidy_and_sum(hibernate, Procs, fun queue:out/1, - fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end, - IgnorePids, Sleepy, queue:new(), 0). - -tidy_and_sum(AtomExpected, Procs, Generator, Consumer, DupCheckSet, - GenInit, ConInit, AllocAcc) -> - case Generator(GenInit) of - {empty, _GetInit} -> {ConInit, AllocAcc}; - {{value, Pid}, GenInit1} -> - {DupCheckSet1, ConInit1, AllocAcc1} = - case sets:is_element(Pid, DupCheckSet) of - true -> - {DupCheckSet, ConInit, AllocAcc}; - false -> - case find_process(Pid, Procs) of - {libre, Alloc, AtomExpected} -> - {sets:add_element(Pid, DupCheckSet), - Consumer(Pid, Alloc, ConInit), - Alloc + AllocAcc}; - _ -> - {DupCheckSet, ConInit, AllocAcc} - end - end, - tidy_and_sum(AtomExpected, Procs, Generator, Consumer, - DupCheckSet1, GenInit1, ConInit1, AllocAcc1) - end. - -free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req, Avail) -> - free_from(Callbacks, - fun(Procs1, Sleepy1, SleepyAcc) -> - case queue:out(Sleepy1) of - {empty, _Sleepy2} -> - empty; - {{value, Pid}, Sleepy2} -> - case sets:is_element(Pid, IgnorePids) of - true -> {skip, Sleepy2, - queue:in(Pid, SleepyAcc)}; - false -> {libre, Alloc, hibernate} = - dict:fetch(Pid, Procs1), - {value, Sleepy2, Pid, Alloc} - end - end - end, fun queue:join/2, Procs, Sleepy, queue:new(), Req, Avail). - -free_from( - Callbacks, Transformer, BaseCase, Procs, DestroyMe, CreateMe, Req, Avail) -> - case Transformer(Procs, DestroyMe, CreateMe) of - empty -> - {CreateMe, Procs, Req}; - {skip, DestroyMe1, CreateMe1} -> - free_from(Callbacks, Transformer, BaseCase, Procs, DestroyMe1, - CreateMe1, Req, Avail); - {value, DestroyMe1, Pid, Alloc} -> - Procs1 = set_process_mode( - Procs, Callbacks, Pid, oppressed, {oppressed, Avail}), - Req1 = Req - Alloc, - case Req1 > 0 of - true -> free_from(Callbacks, Transformer, BaseCase, Procs1, - DestroyMe1, CreateMe, Req1, Avail); - false -> {BaseCase(DestroyMe1, CreateMe), Procs1, Req1} - end - end. - -free_upto(Pid, Req, State = #state { available_tokens = Avail, - processes = Procs, - callbacks = Callbacks, - hibernate = Sleepy, - unoppressable = Unoppressable }) - when Req > Avail -> - Unoppressable1 = sets:add_element(Pid, Unoppressable), - {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs), - case Req > Avail + SleepySum of - true -> %% not enough in sleepy, just return tidied state - State #state { hibernate = Sleepy1 }; - false -> - %% ReqRem will be <= 0 because it's likely we'll have - %% freed more than we need, thus Req - ReqRem is total - %% freed - {Sleepy2, Procs1, ReqRem} = - free_upto_sleepy(Unoppressable1, Callbacks, - Sleepy1, Procs, Req, Avail), - State #state { available_tokens = Avail + (Req - ReqRem), - processes = Procs1, - hibernate = Sleepy2 } - end; -free_upto(_Pid, _Req, State) -> - State. diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index cf184f3f43..649aec4943 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -104,7 +104,7 @@ -spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}). -spec(update/0 :: () -> 'ok'). -spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok'). --spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok'). +-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()). -endif. @@ -119,11 +119,11 @@ update() -> gen_server2:cast(?SERVER, update). register(Pid, MFA = {_M, _F, _A}) -> - gen_server2:cast(?SERVER, {register, Pid, MFA}). + gen_server2:call(?SERVER, {register, Pid, MFA}, infinity). report_queue_duration(Pid, QueueDuration) -> gen_server2:call(rabbit_memory_monitor, - {report_queue_duration, Pid, QueueDuration}). + {report_queue_duration, Pid, QueueDuration}, infinity). %%---------------------------------------------------------------------------- @@ -171,18 +171,18 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From, {noreply, State#state{queue_duration_sum = Sum1, queue_duration_count = Count1}}; +handle_call({register, Pid, MFA}, _From, State = + #state{queue_durations = Durations, callbacks = Callbacks}) -> + _MRef = erlang:monitor(process, Pid), + true = ets:insert(Durations, {Pid, infinity, infinity}), + {reply, ok, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; + handle_call(_Request, _From, State) -> {noreply, State}. handle_cast(update, State) -> {noreply, internal_update(State)}; -handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations, - callbacks = Callbacks}) -> - _MRef = erlang:monitor(process, Pid), - true = ets:insert(Durations, {Pid, infinity, infinity}), - {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}}; - handle_cast(_Request, State) -> {noreply, State}. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 15caf81bb3..79fd24b8d3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -32,11 +32,12 @@ -module(rabbit_variable_queue). -export([init/1, terminate/1, publish/2, publish_delivered/2, - set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1, - ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1, - requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4, - tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1, - can_flush_journal/1, flush_journal/1, status/1]). + set_queue_ram_duration_target/2, remeasure_egress_rate/1, + ram_duration/1, fetch/1, ack/2, len/1, is_empty/1, + maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2, + tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4, + tx_commit_from_vq/1, needs_sync/1, can_flush_journal/1, + flush_journal/1, status/1]). %%---------------------------------------------------------------------------- @@ -130,6 +131,7 @@ -spec(set_queue_ram_duration_target/2 :: (('undefined' | number()), vqstate()) -> vqstate()). -spec(remeasure_egress_rate/1 :: (vqstate()) -> vqstate()). +-spec(ram_duration/1 :: (vqstate()) -> number()). -spec(fetch/1 :: (vqstate()) -> {('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}), vqstate()}). @@ -209,20 +211,23 @@ publish_delivered(Msg = #basic_message { guid = MsgId, {ack_not_on_disk, State} end. -set_queue_ram_duration_target(undefined, State) -> - State; set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, target_ram_msg_count = TargetRamMsgCount }) -> - TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec + TargetRamMsgCount1 = + case DurationTarget of + infinity -> undefined; + undefined -> undefined; + _ -> trunc(DurationTarget * EgressRate) %% msgs = sec * msgs/sec + end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, if TargetRamMsgCount == TargetRamMsgCount1 -> State1; - TargetRamMsgCount == undefined orelse + TargetRamMsgCount1 == undefined orelse TargetRamMsgCount < TargetRamMsgCount1 -> - maybe_start_prefetcher(State1); + State1; true -> reduce_memory_use(State1) end. @@ -246,6 +251,14 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Now, out_counter = 0 }). +ram_duration(#vqstate { avg_egress_rate = AvgEgressRate, + ram_msg_count = RamMsgCount }) -> + %% msgs / (msgs/sec) == sec + case AvgEgressRate == 0 of + true -> infinity; + false -> RamMsgCount / AvgEgressRate + end. + fetch(State = #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, prefetcher = Prefetcher, |
