summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl61
1 files changed, 46 insertions, 15 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 28055af3e0..90fbcf817f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -232,6 +232,7 @@
duration_target,
target_ram_msg_count,
+ target_ram_ack_count,
ram_msg_count,
ram_msg_count_prev,
ram_index_count,
@@ -319,6 +320,7 @@
transient_threshold :: non_neg_integer(),
duration_target :: number() | 'infinity',
target_ram_msg_count :: non_neg_integer() | 'infinity',
+ target_ram_ack_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
ram_index_count :: non_neg_integer(),
@@ -422,6 +424,7 @@ init(QueueName, IsDurable, Recover) ->
duration_target = infinity,
target_ram_msg_count = infinity,
+ target_ram_ack_count = infinity,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_index_count = 0,
@@ -651,10 +654,16 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+within_limits(Previous, Current) ->
+ Current == infinity
+ orelse (Previous =/= infinity andalso Current >= Previous).
+
set_ram_duration_target(DurationTarget,
State = #vqstate {
+ len = Len, pending_ack = PA,
rates = #rates { avg_egress = AvgEgressRate,
avg_ingress = AvgIngressRate },
+ target_ram_ack_count = TargetRamAckCount,
target_ram_msg_count = TargetRamMsgCount }) ->
Rate = AvgEgressRate + AvgIngressRate,
TargetRamMsgCount1 =
@@ -662,13 +671,24 @@ set_ram_duration_target(DurationTarget,
infinity -> infinity;
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
- State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
+
+ io:format("MSG: ~p[~p]~n", [TargetRamMsgCount1, Len]),
+
+ TargetRamAckCount1 = case TargetRamMsgCount1 == infinity orelse Len == 0 of
+ true -> TargetRamMsgCount1;
+ _ -> (TargetRamMsgCount1 / Len) * dict:size(PA)
+ end,
+
+ io:format("ACK: ~p~n", [TargetRamAckCount1]),
+
+ State1 = State #vqstate { target_ram_ack_count = TargetRamAckCount1,
+ target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
- a(case TargetRamMsgCount1 == infinity orelse
- (TargetRamMsgCount =/= infinity andalso
- TargetRamMsgCount1 >= TargetRamMsgCount) of
+
+ a(case within_limits(TargetRamMsgCount, TargetRamMsgCount1) andalso
+ within_limits(TargetRamAckCount, TargetRamAckCount1) of
true -> State1;
- false -> reduce_memory_use(State1)
+ false -> io:format("Reducing~n"), reduce_memory_use(State1)
end).
ram_duration(State = #vqstate {
@@ -1188,21 +1208,32 @@ find_persistent_count(LensByStore) ->
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) ->
- {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count,
+ Size = gb_trees:size(State #vqstate.ram_ack_index),
+ State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of
+ 0 -> State;
+ S -> limit_ram_acks(S, State)
+ end,
+ {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count,
State #vqstate.target_ram_msg_count) of
- 0 -> {false, State};
- S1 -> {true, AlphaBetaFun(S1, State)}
+ 0 -> {false, State1};
+ S1 -> {true, AlphaBetaFun(S1, State1)}
end,
- case State1 #vqstate.target_ram_msg_count of
- infinity -> {Reduce, State1};
- 0 -> {Reduce, BetaDeltaFun(State1)};
- _ -> case chunk_size(State1 #vqstate.ram_index_count,
- permitted_ram_index_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)};
- _ -> {Reduce, State1}
+ case State2 #vqstate.target_ram_msg_count of
+ infinity -> {Reduce, State2};
+ 0 -> {Reduce, BetaDeltaFun(State2)};
+ _ -> case chunk_size(State2 #vqstate.ram_index_count,
+ permitted_ram_index_count(State2)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)};
+ _ -> {Reduce, State2}
end
end.
+limit_ram_acks(0, State) ->
+ State;
+limit_ram_acks(Quota, State) ->
+
+
+
reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun limit_ram_index/2,