diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 45 |
1 files changed, 44 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index a13d55b690..66167394c4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -2123,6 +2123,7 @@ ifold(Fun, Acc, Its, State) -> reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { + mode = default, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, @@ -2168,6 +2169,30 @@ reduce_memory_use(State = #vqstate { end, %% See rabbitmq-server-290 for the reasons behind this GC call. garbage_collect(), + State3; +%% When using lazy queues, there are no alphas, so we don't need to +%% call push_alphas_to_betas/2. +reduce_memory_use(State = #vqstate { + mode = lazy, + ram_pending_ack = RPA, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) -> + State1 = #vqstate { q3 = Q3 } = + case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of + 0 -> State; + S1 -> {_, State2} = limit_ram_acks(S1, State), + State2 + end, + + State3 = + case chunk_size(?QUEUE:len(Q3), + permitted_beta_count(State1)) of + 0 -> + State1; + S2 -> + push_betas_to_deltas(S2, State1) + end, + garbage_collect(), State3. limit_ram_acks(0, State) -> @@ -2191,6 +2216,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, permitted_beta_count(#vqstate { len = 0 }) -> infinity; +permitted_beta_count(#vqstate { mode = lazy, + target_ram_count = TargetRamCount}) -> + TargetRamCount; permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); permitted_beta_count(#vqstate { q1 = Q1, @@ -2346,7 +2374,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, +push_betas_to_deltas(Quota, State = #vqstate { mode = default, + q2 = Q2, delta = Delta, q3 = Q3}) -> PushState = {Quota, Delta, State}, @@ -2361,8 +2390,22 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, {_, Delta1, State1} = PushState2, State1 #vqstate { q2 = Q2a, delta = Delta1, + q3 = Q3a }; +%% In the case of lazy queues we want to page as many messages as +%% possible from q3. +push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, + {Q3a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q3, PushState), + {_, Delta1, State1} = PushState1, + State1 #vqstate { delta = Delta1, q3 = Q3a }. + push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> |
