diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 33 |
1 files changed, 20 insertions, 13 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1a0521d184..8accd876ec 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1823,32 +1823,39 @@ reduce_memory_use(State = #vqstate { State1 end. -limit_ram_acks(0, State = - #vqstate{index_state = IndexState, - target_ram_count = TargetRamCount}) -> +limit_ram_acks(Quota, State = #vqstate{ram_bytes = CurrRamBytes}) -> + limit_ram_acks(Quota, CurrRamBytes, State). + +limit_ram_acks(0, CurrRamBytes, + State = #vqstate{index_state = IndexState, + target_ram_count = TargetRamCount}) -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {0, State#vqstate{index_state = IndexState1}}; -limit_ram_acks(Quota, State = #vqstate { - index_state = IndexState, - target_ram_count = TargetRamCount, - ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + {0, State#vqstate{index_state = IndexState1, + ram_bytes = CurrRamBytes}}; +limit_ram_acks(Quota, CurrRamBytes, + State = #vqstate { + index_state = IndexState, + target_ram_count = TargetRamCount, + ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + {Quota, State#vqstate{index_state = IndexState1, + ram_bytes = CurrRamBytes}}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), + DeltaRam = delta_ram(msg_in_ram(MsgStatus), msg_in_ram(MsgStatus2)), limit_ram_acks(Quota - 1, - stats({0, 0}, {MsgStatus, MsgStatus2}, - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 })) + CurrRamBytes + DeltaRam * msg_size(MsgStatus), + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 }) end. permitted_beta_count(#vqstate { len = 0 }) -> |
