summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2021-03-18 12:58:27 +0100
committerLoïc Hoguin <lhoguin@vmware.com>2021-03-18 12:58:27 +0100
commitffd255cc53258b6b43ad19012c86cc657511bc39 (patch)
tree6cd57979cc9eeeb1e97c0eab9786e8cc660d09f8
parentfa2830edcd6dd7983b36214a703c8d3ee4754009 (diff)
downloadrabbitmq-server-git-limit-lazy-queues-q3.tar.gz
Limit the number of messages in q3 for lazy queueslimit-lazy-queues-q3
This commit introduces min and max options used when converting messages from deltas (full on-disk) to q3 (index or full in memory). Before this commit there could be up to 16k messages read at once and sitting in memory (the full index segment). This has been causing issues depending on the number of queues and the rate of consumption. With this commit the new default depends on the consumption rate. The formula applied is 1.5*RateOut so we read from disk less than once per second. If there is capacity then we may still read up to 16k messages. If the rate is very low, then we have another "minimum" default value of 100. So we will always read at least 100 (less if there are less than 100 messages in the queue) and at most 16k depending on the consumption rate. The values can be configured so that if there is a large number of queues we can limit the amount of messages in memory to a fixed value (for example, set min to 50 and max to 500). This allows greater control over the memory usage of lazy queues.
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl24
1 files changed, 22 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index d4bd907e78..bc0e5169aa 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -2605,6 +2605,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun,
State;
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
+ mode = Mode,
q2 = Q2,
delta = Delta,
q3 = Q3,
@@ -2613,14 +2614,33 @@ maybe_deltas_to_betas(DelsAndAcksFun,
ram_bytes = RamBytes,
disk_read_count = DiskReadCount,
delta_transient_bytes = DeltaTransientBytes,
- transient_threshold = TransientThreshold }) ->
+ transient_threshold = TransientThreshold,
+ rates = #rates{out = RatesOut }}) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
+ %% We limit the number of messages we read from the index. We want a large
+ %% enough value but not one that will use too much memory because when there
+ %% are many queues we may end up exploding the memory. We can use the current
+ %% rate to determine the upper value unless configured otherwise. If the rate
+ %% is too low, the min value will be used instead, and it defaults to 100
+ %% as that's neither too low nor too high.
+ %%
+ %% Note that the max ever fetched will be ?SEGMENT_ENTRY_COUNT which currently
+ %% defaults to 16k. So configuring the max to a higher value than that will
+ %% not have an effect (effectively disabling the limit).
+ %%
+ %% We currently only do this for lazy queues because it has not been tested
+ %% with default queues.
+ DeltaConfigEndList = case Mode of
+ lazy -> [DeltaSeqId + max(application:get_env(rabbit, lazy_queue_min_deltas_to_betas, 100),
+ application:get_env(rabbit, lazy_queue_max_deltas_to_betas, floor(RatesOut * 1.5)))];
+ default -> []
+ end,
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
+ DeltaSeqIdEnd|DeltaConfigEndList]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =