summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl45
1 files changed, 44 insertions, 1 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index a13d55b690..66167394c4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -2123,6 +2123,7 @@ ifold(Fun, Acc, Its, State) ->
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
+ mode = default,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
@@ -2168,6 +2169,30 @@ reduce_memory_use(State = #vqstate {
end,
%% See rabbitmq-server-290 for the reasons behind this GC call.
garbage_collect(),
+ State3;
+%% When using lazy queues, there are no alphas, so we don't need to
+%% call push_alphas_to_betas/2.
+reduce_memory_use(State = #vqstate {
+ mode = lazy,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount }) ->
+ State1 = #vqstate { q3 = Q3 } =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> State;
+ S1 -> {_, State2} = limit_ram_acks(S1, State),
+ State2
+ end,
+
+ State3 =
+ case chunk_size(?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ 0 ->
+ State1;
+ S2 ->
+ push_betas_to_deltas(S2, State1)
+ end,
+ garbage_collect(),
State3.
limit_ram_acks(0, State) ->
@@ -2191,6 +2216,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
+permitted_beta_count(#vqstate { mode = lazy,
+ target_ram_count = TargetRamCount}) ->
+ TargetRamCount;
permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
permitted_beta_count(#vqstate { q1 = Q1,
@@ -2346,7 +2374,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
end
end.
-push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+push_betas_to_deltas(Quota, State = #vqstate { mode = default,
+ q2 = Q2,
delta = Delta,
q3 = Q3}) ->
PushState = {Quota, Delta, State},
@@ -2361,8 +2390,22 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
{_, Delta1, State1} = PushState2,
State1 #vqstate { q2 = Q2a,
delta = Delta1,
+ q3 = Q3a };
+%% In the case of lazy queues we want to page as many messages as
+%% possible from q3.
+push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q3, PushState),
+ {_, Delta1, State1} = PushState1,
+ State1 #vqstate { delta = Delta1,
q3 = Q3a }.
+
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->