diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-27 18:27:04 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-08-27 18:27:04 +0200 |
| commit | 38d58c4c1f4e8090aa6ca42b101f6312ca16e467 (patch) | |
| tree | f8e63de59783c318a899bc78223b8ca1c43c83fc /src | |
| parent | d76d9bd3c8a247acd1ded282ff0b80c4cc049348 (diff) | |
| download | rabbitmq-server-git-38d58c4c1f4e8090aa6ca42b101f6312ca16e467.tar.gz | |
improves limit_ram_acks batching performance
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 26 |
1 files changed, 20 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index fae847d408..ddd75c7ed9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1432,6 +1432,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). +maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> + {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), + maybe_write_index_to_disk_paging(ForceIndex, MsgStatus1, State1). + determine_persist_to(#basic_message{ content = #content{properties = Props, properties_bin = PropsBin}}, @@ -1818,17 +1822,26 @@ reduce_memory_use(State = #vqstate { State1 end. -limit_ram_acks(0, State) -> - {0, State}; -limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> +limit_ram_acks(0, State = + #vqstate{index_state = IndexState, + target_ram_count = TargetRamCount}) -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {0, State#vqstate{index_state = IndexState1}}; +limit_ram_acks(Quota, State = #vqstate { + index_state = IndexState, + target_ram_count = TargetRamCount, + ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> - {Quota, State}; + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = - maybe_write_to_disk(true, false, MsgStatus, State), + maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), limit_ram_acks(Quota - 1, @@ -1974,6 +1987,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> {empty, _Q} -> {Quota, State}; {{value, MsgStatus}, Qa} -> + %% TODO add QI batching here {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), |
