summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-11 17:56:38 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-11 17:56:38 +0000
commit52d6b72b14a50577dc7bceb5c846ed525c294e03 (patch)
tree4d9e4978291d001220796078712111e05b3b2690
parent560c3ba48999ef174386ca91c2381f6af94f6f6e (diff)
downloadrabbitmq-server-git-52d6b72b14a50577dc7bceb5c846ed525c294e03.tar.gz
Wired it all together. It does seem to work, but there seems to be an off by one bug somewhere in vq, wrt γ length counting. Which pops up sometimes... Joy.
-rw-r--r--src/rabbit_amqqueue.erl9
-rw-r--r--src/rabbit_amqqueue_process.erl28
-rw-r--r--src/rabbit_memory_manager.erl404
-rw-r--r--src/rabbit_memory_monitor.erl18
-rw-r--r--src/rabbit_variable_queue.erl33
5 files changed, 63 insertions, 429 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 82a0f5b4fa..f7b39c7782 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -33,7 +33,8 @@
-export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3,
purge/1]).
--export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1]).
+-export([internal_declare/2, internal_delete/1, remeasure_egress_rate/1,
+ set_queue_duration/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
@@ -114,6 +115,7 @@
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(remeasure_egress_rate/1 :: (pid()) -> 'ok').
+-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -373,7 +375,10 @@ internal_delete(QueueName) ->
end).
remeasure_egress_rate(QPid) ->
- gen_server2:pcast(QPid, 8, remeasure_egress_rate).
+ gen_server2:pcast(QPid, 9, remeasure_egress_rate).
+
+set_queue_duration(QPid, Duration) ->
+ gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
prune_queue_childspecs() ->
lists:foreach(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index cd70979a1f..f247c0d1d4 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -103,8 +103,8 @@ start_link(Q) ->
init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
- ok = rabbit_memory_manager:register
- (self(), false, rabbit_amqqueue, set_storage_mode, [self()]),
+ ok = rabbit_memory_monitor:register
+ (self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
VQS = rabbit_variable_queue:init(QName),
State = #q{q = Q,
owner = none,
@@ -872,9 +872,23 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end));
handle_cast(remeasure_egress_rate, State = #q{variable_queue_state = VQS}) ->
+ VQS1 = rabbit_variable_queue:remeasure_egress_rate(VQS),
+ RamDuration = rabbit_variable_queue:ram_duration(VQS1),
+ DesiredDuration =
+ rabbit_memory_monitor:report_queue_duration(self(), RamDuration),
+ VQS2 = rabbit_variable_queue:set_queue_ram_duration_target(
+ DesiredDuration, VQS1),
+ io:format("~p Reported ~p and got back ~p~n", [self(), RamDuration, DesiredDuration]),
+ io:format("~p~n", [rabbit_variable_queue:status(VQS2)]),
noreply(State#q{egress_rate_timer_ref = just_measured,
- variable_queue_state =
- rabbit_variable_queue:remeasure_egress_rate(VQS)}).
+ variable_queue_state = VQS2});
+
+handle_cast({set_queue_duration, Duration},
+ State = #q{variable_queue_state = VQS}) ->
+ VQS1 = rabbit_variable_queue:set_queue_ram_duration_target(
+ Duration, VQS),
+ io:format("~p was told to make duration ~p~n", [self(), Duration]),
+ noreply(State#q{variable_queue_state = VQS1}).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -894,6 +908,12 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
{ok, NewState} -> noreply(NewState);
{stop, NewState} -> {stop, normal, NewState}
end;
+handle_info({'EXIT', _DownPid, normal}, State) ->
+ %% because we have trap_exit on, we'll pick up here the prefetcher
+ %% going down. We probably need to make sure that we really are
+ %% just picking up the prefetcher here. It's safe to ignore it
+ %% though, provided 'normal'
+ noreply(State);
handle_info(timeout, State = #q{variable_queue_state = VQS,
sync_timer_ref = undefined}) ->
diff --git a/src/rabbit_memory_manager.erl b/src/rabbit_memory_manager.erl
deleted file mode 100644
index a73f03e27d..0000000000
--- a/src/rabbit_memory_manager.erl
+++ /dev/null
@@ -1,404 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License at
-%% http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
-%% License for the specific language governing rights and limitations
-%% under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developers of the Original Code are LShift Ltd,
-%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd,
-%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd
-%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial
-%% Technologies LLC, and Rabbit Technologies Ltd.
-%%
-%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift
-%% Ltd. Portions created by Cohesive Financial Technologies LLC are
-%% Copyright (C) 2007-2009 Cohesive Financial Technologies
-%% LLC. Portions created by Rabbit Technologies Ltd are Copyright
-%% (C) 2007-2009 Rabbit Technologies Ltd.
-%%
-%% All Rights Reserved.
-%%
-%% Contributor(s): ______________________________________.
-%%
-
--module(rabbit_memory_manager).
-
--behaviour(gen_server2).
-
--export([start_link/0]).
-
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
-
--export([register/5, report_memory/3, info/0, conserve_memory/2]).
-
--define(TOTAL_TOKENS, 10000000).
--define(THRESHOLD_MULTIPLIER, 0.05).
--define(THRESHOLD_OFFSET, ?TOTAL_TOKENS * ?THRESHOLD_MULTIPLIER).
-
--define(SERVER, ?MODULE).
-
-%%----------------------------------------------------------------------------
-
--ifdef(use_specs).
-
--spec(start_link/0 :: () ->
- ({'ok', pid()} | 'ignore' | {'error', any()})).
--spec(register/5 :: (pid(), boolean(), atom(), atom(), list()) -> 'ok').
--spec(report_memory/3 :: (pid(), non_neg_integer(), boolean()) -> 'ok').
--spec(info/0 :: () -> [{atom(), any()}]).
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
-
--endif.
-
-%%----------------------------------------------------------------------------
-
--record(state, { available_tokens,
- processes,
- callbacks,
- tokens_per_byte,
- hibernate,
- unoppressable,
- alarmed
- }).
-
-%% Token-credit based memory management
-
-%% Start off by working out the amount of memory available in the
-%% system (RAM). Then, work out how many tokens each byte corresponds
-%% to. This is the tokens_per_byte field. When a process registers, it
-%% must provide an M-F-A triple to a function that needs one further
-%% argument, which is the new mode. This will either be 'liberated' or
-%% 'oppressed'.
-%%
-%% Processes then report their own memory usage, in bytes, and the
-%% manager takes care of the rest.
-%%
-%% There are a finite number of tokens in the system. These are
-%% allocated to processes as the processes report their memory
-%% usage. We keep track of processes which have hibernated. When a
-%% process reports memory use which can't be satisfied by the
-%% available tokens, we try and oppress processes first from the
-%% hibernated group. The hibernated group is a simple queue, and so is
-%% implicitly sorted by the order in which processes were added to the
-%% queue. This means that when removing from the queue, we evict the
-%% sleepiest (and most passive) pid first.
-%%
-%% If the reported memory use still can't be satisfied after
-%% oppressing everyone from those two groups (and note that we check
-%% first whether or not oppressing them would make available enough
-%% tokens to satisfy the reported use rather than just oppressing all
-%% those processes and then going "whoops, didn't help after all"),
-%% then we oppress the reporting process. When a process registers, it
-%% can declare itself "unoppressable". If a process is unoppressable
-%% then it will not be oppressed as a result of other processes
-%% needing more tokens. However, if it itself needs additional tokens
-%% which aren't available then it is still oppressed as before. This
-%% feature is only used by the disk_queue, because if the disk queue
-%% is not being used, and hibernates, and then memory pressure gets
-%% tight, the disk_queue would typically be one of the first processes
-%% to be oppressed (sent to disk_only mode), which cripples
-%% performance. Thus by setting it unoppressable, it is only possible
-%% for the disk_queue to be oppressed when it is active and attempting
-%% to increase its memory allocation.
-%%
-%% If a process has been oppressed, it continues making memory
-%% reports, as if it was liberated. As soon as a reported amount of
-%% memory can be satisfied (and this can include oppressing other
-%% processes in the way described above), *and* the number of
-%% available tokens has changed by ?THRESHOLD_MULTIPLIER since the
-%% processes was oppressed, it will be liberated. This later condition
-%% prevents processes from continually oppressing each other if they
-%% themselves can be liberated by oppressing other processes.
-%%
-%% Note that the hibernate group can get very out of date. This is
-%% fine, and somewhat unavoidable given the absence of useful APIs for
-%% queues. Thus we allow them to get out of date (processes will be
-%% left in there when they change groups, duplicates can appear, dead
-%% processes are not pruned etc etc etc), and when we go through the
-%% groups, summing up their allocated tokens, we tidy up at that
-%% point.
-%%
-%% A liberated process, which is reporting a smaller amount of RAM
-%% than its last report will remain liberated. A liberated process
-%% that is busy but consuming an unchanging amount of RAM will never
-%% be oppressed.
-
-%% Specific notes as applied to queues and the disk_queue:
-%%
-%% The disk_queue is managed in the same way as queues. This means
-%% that a queue that has gone back to mixed mode after being in disk
-%% mode now has its messages counted twice as they are counted both in
-%% the report made by the queue (even though they may not yet be in
-%% RAM (though see the prefetcher)) and also by the disk_queue. Thus
-%% the amount of available RAM must be higher when going disk -> mixed
-%% than when going mixed -> disk. This is fairly sensible as it
-%% reduces the risk of any oscillations occurring.
-%%
-%% The queue process deliberately reports 4 times its estimated RAM
-%% usage, and the disk_queue 2.5 times. In practise, this seems to
-%% work well. Note that we are deliberately running out of tokes a
-%% little early because of the fact that the mixed -> disk transition
-%% can transiently eat a lot of memory and take some time (flushing a
-%% few million messages to disk is never going to be instantaneous).
-
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [], []).
-
-register(Pid, Unoppressable, Module, Function, Args) ->
- gen_server2:cast(?SERVER, {register, Pid, Unoppressable,
- Module, Function, Args}).
-
-report_memory(Pid, Memory, Hibernating) ->
- gen_server2:cast(?SERVER, {report_memory, Pid, Memory, Hibernating}).
-
-info() ->
- gen_server2:call(?SERVER, info).
-
-conserve_memory(_Pid, Conserve) ->
- gen_server2:pcast(?SERVER, 9, {conserve_memory, Conserve}).
-
-%%----------------------------------------------------------------------------
-
-init([]) ->
- process_flag(trap_exit, true),
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
- {MemTotal, MemUsed, _BigProc} = memsup:get_memory_data(),
- MemAvail = MemTotal - MemUsed,
- TPB = if MemAvail == 0 -> 0;
- true -> ?TOTAL_TOKENS / MemAvail
- end,
- {ok, #state { available_tokens = ?TOTAL_TOKENS,
- processes = dict:new(),
- callbacks = dict:new(),
- tokens_per_byte = TPB,
- hibernate = queue:new(),
- unoppressable = sets:new(),
- alarmed = false
- }}.
-
-handle_call(info, _From, State) ->
- State1 = #state { available_tokens = Avail,
- processes = Procs,
- hibernate = Sleepy,
- unoppressable = Unoppressable } =
- free_upto(undefined, 1 + ?TOTAL_TOKENS, State), %% just tidy
- {reply, [{ available_tokens, Avail },
- { processes, dict:to_list(Procs) },
- { hibernated_processes, queue:to_list(Sleepy) },
- { unoppressable_processes, sets:to_list(Unoppressable) }], State1}.
-
-handle_cast({report_memory, Pid, Memory, Hibernating},
- State = #state { processes = Procs,
- available_tokens = Avail,
- callbacks = Callbacks,
- tokens_per_byte = TPB,
- alarmed = Alarmed }) ->
- Req = rabbit_misc:ceil(TPB * Memory),
- LibreActivity = if Hibernating -> hibernate;
- true -> active
- end,
- {StateN = #state { hibernate = Sleepy }, ActivityNew} =
- case find_process(Pid, Procs) of
- {libre, OAlloc, _OActivity} ->
- Avail1 = Avail + OAlloc,
- State1 = #state { available_tokens = Avail2,
- processes = Procs1 }
- = free_upto(Pid, Req,
- State #state { available_tokens = Avail1 }),
- case Req > Avail2 of
- true -> %% nowt we can do, oppress the process
- Procs2 =
- set_process_mode(Procs1, Callbacks, Pid, oppressed,
- {oppressed, Avail2}),
- {State1 #state { processes = Procs2 }, oppressed};
- false -> %% keep liberated
- {State1 #state
- { processes =
- dict:store(Pid, {libre, Req, LibreActivity}, Procs1),
- available_tokens = Avail2 - Req },
- LibreActivity}
- end;
- {oppressed, OrigAvail} ->
- case Req > 0 andalso
- ( Alarmed orelse Hibernating orelse
- (Avail > (OrigAvail - ?THRESHOLD_OFFSET) andalso
- Avail < (OrigAvail + ?THRESHOLD_OFFSET)) ) of
- true ->
- {State, oppressed};
- false ->
- State1 = #state { available_tokens = Avail1,
- processes = Procs1 } =
- free_upto(Pid, Req, State),
- case Req > Avail1 of
- true ->
- %% not enough space, so stay oppressed
- {State1, oppressed};
- false -> %% can liberate the process
- Procs2 = set_process_mode(
- Procs1, Callbacks, Pid, liberated,
- {libre, Req, LibreActivity}),
- {State1 #state {
- processes = Procs2,
- available_tokens = Avail1 - Req },
- LibreActivity}
- end
- end
- end,
- StateN1 =
- case ActivityNew of
- active -> StateN;
- oppressed -> StateN;
- hibernate ->
- StateN #state { hibernate = queue:in(Pid, Sleepy) }
- end,
- {noreply, StateN1};
-
-handle_cast({register, Pid, IsUnoppressable, Module, Function, Args},
- State = #state { callbacks = Callbacks,
- unoppressable = Unoppressable }) ->
- _MRef = erlang:monitor(process, Pid),
- Unoppressable1 = case IsUnoppressable of
- true -> sets:add_element(Pid, Unoppressable);
- false -> Unoppressable
- end,
- {noreply, State #state { callbacks = dict:store
- (Pid, {Module, Function, Args}, Callbacks),
- unoppressable = Unoppressable1
- }};
-
-handle_cast({conserve_memory, Conserve}, State) ->
- {noreply, State #state { alarmed = Conserve }}.
-
-handle_info({'DOWN', _MRef, process, Pid, _Reason},
- State = #state { available_tokens = Avail,
- processes = Procs,
- callbacks = Callbacks }) ->
- State1 = State #state { processes = dict:erase(Pid, Procs),
- callbacks = dict:erase(Pid, Callbacks) },
- {noreply, case find_process(Pid, Procs) of
- {oppressed, _OrigReq} ->
- State1;
- {libre, Alloc, _Activity} ->
- State1 #state { available_tokens = Avail + Alloc }
- end};
-handle_info({'EXIT', _Pid, Reason}, State) ->
- {stop, Reason, State};
-handle_info(_Info, State) ->
- {noreply, State}.
-
-terminate(_Reason, State) ->
- State.
-
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%----------------------------------------------------------------------------
-
-find_process(Pid, Procs) ->
- case dict:find(Pid, Procs) of
- {ok, Value} -> Value;
- error -> {oppressed, 0}
- end.
-
-set_process_mode(Procs, Callbacks, Pid, Mode, Record) ->
- {Module, Function, Args} = dict:fetch(Pid, Callbacks),
- ok = erlang:apply(Module, Function, Args ++ [Mode]),
- dict:store(Pid, Record, Procs).
-
-tidy_and_sum_sleepy(IgnorePids, Sleepy, Procs) ->
- tidy_and_sum(hibernate, Procs, fun queue:out/1,
- fun (Pid, _Alloc, Queue) -> queue:in(Pid, Queue) end,
- IgnorePids, Sleepy, queue:new(), 0).
-
-tidy_and_sum(AtomExpected, Procs, Generator, Consumer, DupCheckSet,
- GenInit, ConInit, AllocAcc) ->
- case Generator(GenInit) of
- {empty, _GetInit} -> {ConInit, AllocAcc};
- {{value, Pid}, GenInit1} ->
- {DupCheckSet1, ConInit1, AllocAcc1} =
- case sets:is_element(Pid, DupCheckSet) of
- true ->
- {DupCheckSet, ConInit, AllocAcc};
- false ->
- case find_process(Pid, Procs) of
- {libre, Alloc, AtomExpected} ->
- {sets:add_element(Pid, DupCheckSet),
- Consumer(Pid, Alloc, ConInit),
- Alloc + AllocAcc};
- _ ->
- {DupCheckSet, ConInit, AllocAcc}
- end
- end,
- tidy_and_sum(AtomExpected, Procs, Generator, Consumer,
- DupCheckSet1, GenInit1, ConInit1, AllocAcc1)
- end.
-
-free_upto_sleepy(IgnorePids, Callbacks, Sleepy, Procs, Req, Avail) ->
- free_from(Callbacks,
- fun(Procs1, Sleepy1, SleepyAcc) ->
- case queue:out(Sleepy1) of
- {empty, _Sleepy2} ->
- empty;
- {{value, Pid}, Sleepy2} ->
- case sets:is_element(Pid, IgnorePids) of
- true -> {skip, Sleepy2,
- queue:in(Pid, SleepyAcc)};
- false -> {libre, Alloc, hibernate} =
- dict:fetch(Pid, Procs1),
- {value, Sleepy2, Pid, Alloc}
- end
- end
- end, fun queue:join/2, Procs, Sleepy, queue:new(), Req, Avail).
-
-free_from(
- Callbacks, Transformer, BaseCase, Procs, DestroyMe, CreateMe, Req, Avail) ->
- case Transformer(Procs, DestroyMe, CreateMe) of
- empty ->
- {CreateMe, Procs, Req};
- {skip, DestroyMe1, CreateMe1} ->
- free_from(Callbacks, Transformer, BaseCase, Procs, DestroyMe1,
- CreateMe1, Req, Avail);
- {value, DestroyMe1, Pid, Alloc} ->
- Procs1 = set_process_mode(
- Procs, Callbacks, Pid, oppressed, {oppressed, Avail}),
- Req1 = Req - Alloc,
- case Req1 > 0 of
- true -> free_from(Callbacks, Transformer, BaseCase, Procs1,
- DestroyMe1, CreateMe, Req1, Avail);
- false -> {BaseCase(DestroyMe1, CreateMe), Procs1, Req1}
- end
- end.
-
-free_upto(Pid, Req, State = #state { available_tokens = Avail,
- processes = Procs,
- callbacks = Callbacks,
- hibernate = Sleepy,
- unoppressable = Unoppressable })
- when Req > Avail ->
- Unoppressable1 = sets:add_element(Pid, Unoppressable),
- {Sleepy1, SleepySum} = tidy_and_sum_sleepy(Unoppressable1, Sleepy, Procs),
- case Req > Avail + SleepySum of
- true -> %% not enough in sleepy, just return tidied state
- State #state { hibernate = Sleepy1 };
- false ->
- %% ReqRem will be <= 0 because it's likely we'll have
- %% freed more than we need, thus Req - ReqRem is total
- %% freed
- {Sleepy2, Procs1, ReqRem} =
- free_upto_sleepy(Unoppressable1, Callbacks,
- Sleepy1, Procs, Req, Avail),
- State #state { available_tokens = Avail + (Req - ReqRem),
- processes = Procs1,
- hibernate = Sleepy2 }
- end;
-free_upto(_Pid, _Req, State) ->
- State.
diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl
index cf184f3f43..649aec4943 100644
--- a/src/rabbit_memory_monitor.erl
+++ b/src/rabbit_memory_monitor.erl
@@ -104,7 +104,7 @@
-spec(start_link/0 :: () -> 'ignore' | {'error', _} | {'ok', pid()}).
-spec(update/0 :: () -> 'ok').
-spec(register/2 :: (pid(), {atom(),atom(),[any()]}) -> 'ok').
--spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> 'ok').
+-spec(report_queue_duration/2 :: (pid(), float() | 'infinity') -> number()).
-endif.
@@ -119,11 +119,11 @@ update() ->
gen_server2:cast(?SERVER, update).
register(Pid, MFA = {_M, _F, _A}) ->
- gen_server2:cast(?SERVER, {register, Pid, MFA}).
+ gen_server2:call(?SERVER, {register, Pid, MFA}, infinity).
report_queue_duration(Pid, QueueDuration) ->
gen_server2:call(rabbit_memory_monitor,
- {report_queue_duration, Pid, QueueDuration}).
+ {report_queue_duration, Pid, QueueDuration}, infinity).
%%----------------------------------------------------------------------------
@@ -171,18 +171,18 @@ handle_call({report_queue_duration, Pid, QueueDuration}, From,
{noreply, State#state{queue_duration_sum = Sum1,
queue_duration_count = Count1}};
+handle_call({register, Pid, MFA}, _From, State =
+ #state{queue_durations = Durations, callbacks = Callbacks}) ->
+ _MRef = erlang:monitor(process, Pid),
+ true = ets:insert(Durations, {Pid, infinity, infinity}),
+ {reply, ok, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}};
+
handle_call(_Request, _From, State) ->
{noreply, State}.
handle_cast(update, State) ->
{noreply, internal_update(State)};
-handle_cast({register, Pid, MFA}, State = #state{queue_durations = Durations,
- callbacks = Callbacks}) ->
- _MRef = erlang:monitor(process, Pid),
- true = ets:insert(Durations, {Pid, infinity, infinity}),
- {noreply, State#state{callbacks = dict:store(Pid, MFA, Callbacks)}};
-
handle_cast(_Request, State) ->
{noreply, State}.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 15caf81bb3..79fd24b8d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -32,11 +32,12 @@
-module(rabbit_variable_queue).
-export([init/1, terminate/1, publish/2, publish_delivered/2,
- set_queue_ram_duration_target/2, remeasure_egress_rate/1, fetch/1,
- ack/2, len/1, is_empty/1, maybe_start_prefetcher/1, purge/1, delete/1,
- requeue/2, tx_publish/2, tx_rollback/2, tx_commit/4,
- tx_commit_from_msg_store/4, tx_commit_from_vq/1, needs_sync/1,
- can_flush_journal/1, flush_journal/1, status/1]).
+ set_queue_ram_duration_target/2, remeasure_egress_rate/1,
+ ram_duration/1, fetch/1, ack/2, len/1, is_empty/1,
+ maybe_start_prefetcher/1, purge/1, delete/1, requeue/2, tx_publish/2,
+ tx_rollback/2, tx_commit/4, tx_commit_from_msg_store/4,
+ tx_commit_from_vq/1, needs_sync/1, can_flush_journal/1,
+ flush_journal/1, status/1]).
%%----------------------------------------------------------------------------
@@ -130,6 +131,7 @@
-spec(set_queue_ram_duration_target/2 ::
(('undefined' | number()), vqstate()) -> vqstate()).
-spec(remeasure_egress_rate/1 :: (vqstate()) -> vqstate()).
+-spec(ram_duration/1 :: (vqstate()) -> number()).
-spec(fetch/1 :: (vqstate()) ->
{('empty'|{basic_message(), boolean(), ack(), non_neg_integer()}),
vqstate()}).
@@ -209,20 +211,23 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
{ack_not_on_disk, State}
end.
-set_queue_ram_duration_target(undefined, State) ->
- State;
set_queue_ram_duration_target(
DurationTarget, State = #vqstate { avg_egress_rate = EgressRate,
target_ram_msg_count = TargetRamMsgCount
}) ->
- TargetRamMsgCount1 = trunc(DurationTarget * EgressRate), %% msgs = sec * msgs/sec
+ TargetRamMsgCount1 =
+ case DurationTarget of
+ infinity -> undefined;
+ undefined -> undefined;
+ _ -> trunc(DurationTarget * EgressRate) %% msgs = sec * msgs/sec
+ end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
if TargetRamMsgCount == TargetRamMsgCount1 ->
State1;
- TargetRamMsgCount == undefined orelse
+ TargetRamMsgCount1 == undefined orelse
TargetRamMsgCount < TargetRamMsgCount1 ->
- maybe_start_prefetcher(State1);
+ State1;
true ->
reduce_memory_use(State1)
end.
@@ -246,6 +251,14 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate,
egress_rate_timestamp = Now,
out_counter = 0 }).
+ram_duration(#vqstate { avg_egress_rate = AvgEgressRate,
+ ram_msg_count = RamMsgCount }) ->
+ %% msgs / (msgs/sec) == sec
+ case AvgEgressRate == 0 of
+ true -> infinity;
+ false -> RamMsgCount / AvgEgressRate
+ end.
+
fetch(State =
#vqstate { q4 = Q4, ram_msg_count = RamMsgCount,
out_counter = OutCount, prefetcher = Prefetcher,