diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 22 |
1 files changed, 19 insertions, 3 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90fbcf817f..f09fb4d47b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1211,7 +1211,7 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> Size = gb_trees:size(State #vqstate.ram_ack_index), State1 = case chunk_size(Size, State #vqstate.target_ram_ack_count) of 0 -> State; - S -> limit_ram_acks(S, State) + S -> io:format("Limiting~n"), limit_ram_acks(S, State) end, {Reduce, State2} = case chunk_size(State #vqstate.ram_msg_count, State #vqstate.target_ram_msg_count) of @@ -1230,8 +1230,24 @@ reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> limit_ram_acks(0, State) -> State; -limit_ram_acks(Quota, State) -> - +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + io:format("Limiting acks~p~n", [Quota]), + case gb_trees:is_empty(RAI) of + true -> + State; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + io:format("Largest~p~n", [SeqId]), + MsgStatus = dict:fetch(SeqId, PA), + State1 = maybe_write_to_disk(true, false, MsgStatus, State), + io:format("Wrote~n"), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:update(SeqId, {false, Guid}, PA), + ram_ack_index = RAI1 }) + end. reduce_memory_use(State) -> |
