summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl24
1 files changed, 22 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index d4bd907e78..bc0e5169aa 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -2605,6 +2605,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun,
State;
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
+ mode = Mode,
q2 = Q2,
delta = Delta,
q3 = Q3,
@@ -2613,14 +2614,33 @@ maybe_deltas_to_betas(DelsAndAcksFun,
ram_bytes = RamBytes,
disk_read_count = DiskReadCount,
delta_transient_bytes = DeltaTransientBytes,
- transient_threshold = TransientThreshold }) ->
+ transient_threshold = TransientThreshold,
+ rates = #rates{out = RatesOut }}) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
+ %% We limit the number of messages we read from the index. We want a large
+ %% enough value but not one that will use too much memory because when there
+ %% are many queues we may end up exploding the memory. We can use the current
+ %% rate to determine the upper value unless configured otherwise. If the rate
+ %% is too low, the min value will be used instead, and it defaults to 100
+ %% as that's neither too low nor too high.
+ %%
+ %% Note that the max ever fetched will be ?SEGMENT_ENTRY_COUNT which currently
+ %% defaults to 16k. So configuring the max to a higher value than that will
+ %% not have an effect (effectively disabling the limit).
+ %%
+ %% We currently only do this for lazy queues because it has not been tested
+ %% with default queues.
+ DeltaConfigEndList = case Mode of
+ lazy -> [DeltaSeqId + max(application:get_env(rabbit, lazy_queue_min_deltas_to_betas, 100),
+ application:get_env(rabbit, lazy_queue_max_deltas_to_betas, floor(RatesOut * 1.5)))];
+ default -> []
+ end,
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
+ DeltaSeqIdEnd|DeltaConfigEndList]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =