diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index fae847d408..ddd75c7ed9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1432,6 +1432,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). +maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> + {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), + maybe_write_index_to_disk_paging(ForceIndex, MsgStatus1, State1). + determine_persist_to(#basic_message{ content = #content{properties = Props, properties_bin = PropsBin}}, @@ -1818,17 +1822,26 @@ reduce_memory_use(State = #vqstate { State1 end. -limit_ram_acks(0, State) -> - {0, State}; -limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +limit_ram_acks(0, State = + #vqstate{index_state = IndexState, + target_ram_count = TargetRamCount}) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {0, State#vqstate{index_state = IndexState1}}; +limit_ram_acks(Quota, State = #vqstate { + index_state = IndexState, + target_ram_count = TargetRamCount, + ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> - {Quota, State}; + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), + maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, @@ -1974,6 +1987,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> + %% TODO add QI batching here {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), |
