diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2021-03-04 13:19:37 +0100 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2021-03-05 13:33:48 +0100 |
commit | 6de21fe717547a1e225aac4191f0b012b554a7c7 (patch) | |
tree | ec650f41ea2ae13fdedfbaf0d22a6d46b068a3a8 | |
parent | fd86959429eb42dae40ec3dc51aab2fa51125aec (diff) | |
download | rabbitmq-server-git-6de21fe717547a1e225aac4191f0b012b554a7c7.tar.gz |
Aggressively reduce memory usage of lazy queues
In its current form this probably does not fit busier scenarios.
-rw-r--r-- | deps/rabbit/src/rabbit_variable_queue.erl | 35 |
1 files changed, 17 insertions, 18 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index b23592ecf0..02c0128234 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -2389,6 +2389,8 @@ ifold(Fun, Acc, Its0, State0) -> %% Phase changes %%---------------------------------------------------------------------------- +maybe_reduce_memory_use(State = #vqstate {mode = lazy}) -> + reduce_memory_use(State); maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount, mode = Mode}) -> case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of @@ -2397,7 +2399,7 @@ maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCo false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1} end. -reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> +reduce_memory_use(State = #vqstate { mode = default, target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { mode = default, @@ -2481,27 +2483,22 @@ reduce_memory_use(State = #vqstate { %% When using lazy queues, there are no alphas, so we don't need to %% call push_alphas_to_betas/2. reduce_memory_use(State = #vqstate { - mode = lazy, + q3 = Q3, + mode = lazy, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, - target_ram_count = TargetRamCount }) -> - State1 = #vqstate { q3 = Q3 } = + target_ram_count = TargetRamCount, + index_state = IndexState }) -> + State3 = case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of 0 -> State; S1 -> {_, State2} = limit_ram_acks(S1, State), State2 end, - - State3 = - case chunk_size(?QUEUE:len(Q3), - permitted_beta_count(State1)) of - 0 -> - State1; - S2 -> - push_betas_to_deltas(S2, State1) - end, + State4 = push_betas_to_deltas(?QUEUE:len(Q3), State3), + State5 = State4 #vqstate { index_state = rabbit_queue_index:flush(IndexState) }, garbage_collect(), - State3. + State5. maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) -> State; @@ -2530,9 +2527,6 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, permitted_beta_count(#vqstate { len = 0 }) -> infinity; -permitted_beta_count(#vqstate { mode = lazy, - target_ram_count = TargetRamCount}) -> - TargetRamCount; permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); permitted_beta_count(#vqstate { q1 = Q1, @@ -2605,6 +2599,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun, State; maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { + mode = Mode, q2 = Q2, delta = Delta, q3 = Q3, @@ -2618,9 +2613,13 @@ maybe_deltas_to_betas(DelsAndAcksFun, count = DeltaCount, transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, + DeltaConfigEndList = case Mode of + lazy -> [DeltaSeqId + application:get_env(rabbit, lazy_queue_max_deltas_to_betas, 10)]; + default -> [] + end, DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), - DeltaSeqIdEnd]), + DeltaSeqIdEnd|DeltaConfigEndList]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} = |