diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 61 |
1 files changed, 46 insertions, 15 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 28055af3e0..90fbcf817f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -232,6 +232,7 @@ duration_target, target_ram_msg_count, + target_ram_ack_count, ram_msg_count, ram_msg_count_prev, ram_index_count, @@ -319,6 +320,7 @@ transient_threshold :: non_neg_integer(), duration_target :: number() | 'infinity', target_ram_msg_count :: non_neg_integer() | 'infinity', + target_ram_ack_count :: non_neg_integer() | 'infinity', ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), ram_index_count :: non_neg_integer(), @@ -422,6 +424,7 @@ init(QueueName, IsDurable, Recover) -> duration_target = infinity, target_ram_msg_count = infinity, + target_ram_ack_count = infinity, ram_msg_count = 0, ram_msg_count_prev = 0, ram_index_count = 0, @@ -651,10 +654,16 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +within_limits(Previous, Current) -> + Current == infinity + orelse (Previous =/= infinity andalso Current >= Previous). + set_ram_duration_target(DurationTarget, State = #vqstate { + len = Len, pending_ack = PA, rates = #rates { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate }, + target_ram_ack_count = TargetRamAckCount, target_ram_msg_count = TargetRamMsgCount }) -> Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = @@ -662,13 +671,24 @@ set_ram_duration_target(DurationTarget, infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, + + io:format("MSG: ~p[~p]~n", [TargetRamMsgCount1, Len]), + + TargetRamAckCount1 = case TargetRamMsgCount1 == infinity orelse Len == 0 of + true -> TargetRamMsgCount1; + _ -> (TargetRamMsgCount1 / Len) * dict:size(PA) + end, + + io:format("ACK: ~p~n", [TargetRamAckCount1]), + + State1 = State #vqstate { target_ram_ack_count = TargetRamAckCount1, + target_ram_msg_count = TargetRamMsgCount1, duration_target = DurationTarget }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + + a(case within_limits(TargetRamMsgCount, TargetRamMsgCount1) andalso + within_limits(TargetRamAckCount, TargetRamAckCount1) of true -> State1; - false -> reduce_memory_use(State1) + false -> io:format("Reducing~n"), reduce_memory_use(State1) end). ram_duration(State = #vqstate { @@ -1188,21 +1208,32 @@ find_persistent_count(LensByStore) -> %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, + Size = gb_trees:size(State #vqstate.ram_ack_index), + State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of + 0 -> State; + S -> limit_ram_acks(S, State) + end, + {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} + 0 -> {false, State1}; + S1 -> {true, AlphaBetaFun(S1, State1)} end, - case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} + case State2 #vqstate.target_ram_msg_count of + infinity -> {Reduce, State2}; + 0 -> {Reduce, BetaDeltaFun(State2)}; + _ -> case chunk_size(State2 #vqstate.ram_index_count, + permitted_ram_index_count(State2)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State2)}; + _ -> {Reduce, State2} end end. +limit_ram_acks(0, State) -> + State; +limit_ram_acks(Quota, State) -> + + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, |
