diff options
-rw-r--r-- | deps/rabbit/src/rabbit_variable_queue.erl | 24 |
1 files changed, 22 insertions, 2 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index d4bd907e78..bc0e5169aa 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -2605,6 +2605,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun, State; maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { + mode = Mode, q2 = Q2, delta = Delta, q3 = Q3, @@ -2613,14 +2614,33 @@ maybe_deltas_to_betas(DelsAndAcksFun, ram_bytes = RamBytes, disk_read_count = DiskReadCount, delta_transient_bytes = DeltaTransientBytes, - transient_threshold = TransientThreshold }) -> + transient_threshold = TransientThreshold, + rates = #rates{out = RatesOut }}) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, + %% We limit the number of messages we read from the index. We want a large + %% enough value but not one that will use too much memory because when there + %% are many queues we may end up exploding the memory. We can use the current + %% rate to determine the upper value unless configured otherwise. If the rate + %% is too low, the min value will be used instead, and it defaults to 100 + %% as that's neither too low nor too high. + %% + %% Note that the max ever fetched will be ?SEGMENT_ENTRY_COUNT which currently + %% defaults to 16k. So configuring the max to a higher value than that will + %% not have an effect (effectively disabling the limit). + %% + %% We currently only do this for lazy queues because it has not been tested + %% with default queues. + DeltaConfigEndList = case Mode of + lazy -> [DeltaSeqId + max(application:get_env(rabbit, lazy_queue_min_deltas_to_betas, 100), + application:get_env(rabbit, lazy_queue_max_deltas_to_betas, floor(RatesOut * 1.5)))]; + default -> [] + end, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), + DeltaSeqIdEnd|DeltaConfigEndList]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = |