summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-27 18:27:04 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-27 18:27:04 +0200
commit38d58c4c1f4e8090aa6ca42b101f6312ca16e467 (patch)
treef8e63de59783c318a899bc78223b8ca1c43c83fc
parentd76d9bd3c8a247acd1ded282ff0b80c4cc049348 (diff)
downloadrabbitmq-server-git-38d58c4c1f4e8090aa6ca42b101f6312ca16e467.tar.gz
improves limit_ram_acks batching performance
-rw-r--r--src/rabbit_variable_queue.erl26
1 files changed, 20 insertions, 6 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index fae847d408..ddd75c7ed9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1432,6 +1432,10 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_write_index_to_disk_paging(ForceIndex, MsgStatus1, State1).
+
determine_persist_to(#basic_message{
content = #content{properties = Props,
properties_bin = PropsBin}},
@@ -1818,17 +1822,26 @@ reduce_memory_use(State = #vqstate {
State1
end.
-limit_ram_acks(0, State) ->
- {0, State};
-limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
+limit_ram_acks(0, State =
+ #vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount}) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {0, State#vqstate{index_state = IndexState1}};
+limit_ram_acks(Quota, State = #vqstate {
+ index_state = IndexState,
+ target_ram_count = TargetRamCount,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
case gb_trees:is_empty(RPA) of
true ->
- {Quota, State};
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
false ->
{SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
{MsgStatus1, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
+ maybe_prepare_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
limit_ram_acks(Quota - 1,
@@ -1974,6 +1987,7 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
+ %% TODO add QI batching here
{MsgStatus1, State1} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),