summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2021-03-04 13:19:37 +0100
committerLoïc Hoguin <lhoguin@vmware.com>2021-03-05 13:33:48 +0100
commit6de21fe717547a1e225aac4191f0b012b554a7c7 (patch)
treeec650f41ea2ae13fdedfbaf0d22a6d46b068a3a8
parentfd86959429eb42dae40ec3dc51aab2fa51125aec (diff)
downloadrabbitmq-server-git-6de21fe717547a1e225aac4191f0b012b554a7c7.tar.gz
Aggressively reduce memory usage of lazy queues
In its current form this probably does not fit busier scenarios.
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl35
1 files changed, 17 insertions, 18 deletions
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index b23592ecf0..02c0128234 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -2389,6 +2389,8 @@ ifold(Fun, Acc, Its0, State0) ->
%% Phase changes
%%----------------------------------------------------------------------------
+maybe_reduce_memory_use(State = #vqstate {mode = lazy}) ->
+ reduce_memory_use(State);
maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCount,
mode = Mode}) ->
case MRedRunCount >= ?EXPLICIT_GC_RUN_OP_THRESHOLD(Mode) of
@@ -2397,7 +2399,7 @@ maybe_reduce_memory_use(State = #vqstate {memory_reduction_run_count = MRedRunCo
false -> State#vqstate{memory_reduction_run_count = MRedRunCount + 1}
end.
-reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+reduce_memory_use(State = #vqstate { mode = default, target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
mode = default,
@@ -2481,27 +2483,22 @@ reduce_memory_use(State = #vqstate {
%% When using lazy queues, there are no alphas, so we don't need to
%% call push_alphas_to_betas/2.
reduce_memory_use(State = #vqstate {
- mode = lazy,
+ q3 = Q3,
+ mode = lazy,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
- target_ram_count = TargetRamCount }) ->
- State1 = #vqstate { q3 = Q3 } =
+ target_ram_count = TargetRamCount,
+ index_state = IndexState }) ->
+ State3 =
case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
0 -> State;
S1 -> {_, State2} = limit_ram_acks(S1, State),
State2
end,
-
- State3 =
- case chunk_size(?QUEUE:len(Q3),
- permitted_beta_count(State1)) of
- 0 ->
- State1;
- S2 ->
- push_betas_to_deltas(S2, State1)
- end,
+ State4 = push_betas_to_deltas(?QUEUE:len(Q3), State3),
+ State5 = State4 #vqstate { index_state = rabbit_queue_index:flush(IndexState) },
garbage_collect(),
- State3.
+ State5.
maybe_bump_reduce_memory_use(State = #vqstate{ waiting_bump = true }) ->
State;
@@ -2530,9 +2527,6 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
-permitted_beta_count(#vqstate { mode = lazy,
- target_ram_count = TargetRamCount}) ->
- TargetRamCount;
permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
permitted_beta_count(#vqstate { q1 = Q1,
@@ -2605,6 +2599,7 @@ maybe_deltas_to_betas(_DelsAndAcksFun,
State;
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
+ mode = Mode,
q2 = Q2,
delta = Delta,
q3 = Q3,
@@ -2618,9 +2613,13 @@ maybe_deltas_to_betas(DelsAndAcksFun,
count = DeltaCount,
transient = Transient,
end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaConfigEndList = case Mode of
+ lazy -> [DeltaSeqId + application:get_env(rabbit, lazy_queue_max_deltas_to_betas, 10)];
+ default -> []
+ end,
DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
- DeltaSeqIdEnd]),
+ DeltaSeqIdEnd|DeltaConfigEndList]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
{Q3a, RamCountsInc, RamBytesInc, State1, TransientCount, TransientBytes} =