summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-08-27 18:39:28 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-08-27 18:39:28 +0200
commit7b841803f39a005671a7f603b95ec99570d086de (patch)
treede1c2e5c6a9eecd9fee1bc6afa88630ed6791127
parent38d58c4c1f4e8090aa6ca42b101f6312ca16e467 (diff)
downloadrabbitmq-server-git-7b841803f39a005671a7f603b95ec99570d086de.tar.gz
improves push_alphas_to_betas paging performance
-rw-r--r--src/rabbit_variable_queue.erl48
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddd75c7ed9..682b9e0eb2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1974,29 +1974,41 @@ push_alphas_to_betas(Quota, State) ->
{Quota2, State2}.
push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate { ram_msg_count = RamMsgCount,
+ State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
- {Quota, State};
-push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+push_alphas_to_betas(Generator, Consumer, Quota, Q,
+ State = #vqstate{
+ index_state = IndexState,
+ target_ram_count = TargetRamCount}) ->
case credit_flow:blocked() of
- true -> {Quota, State};
- false -> case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- %% TODO add QI batching here
- {MsgStatus1, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, State1),
- State3 = Consumer(MsgStatus2, Qa, State2),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State3)
- end
+ true ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ false ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State3)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,