summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl48
1 files changed, 30 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ddd75c7ed9..682b9e0eb2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1974,29 +1974,41 @@ push_alphas_to_betas(Quota, State) ->
{Quota2, State2}.
push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
- State = #vqstate { ram_msg_count = RamMsgCount,
+ State = #vqstate { index_state = IndexState,
+ ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
TargetRamCount >= RamMsgCount ->
- {Quota, State};
-push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+push_alphas_to_betas(Generator, Consumer, Quota, Q,
+ State = #vqstate{
+ index_state = IndexState,
+ target_ram_count = TargetRamCount}) ->
case credit_flow:blocked() of
- true -> {Quota, State};
- false -> case Generator(Q) of
- {empty, _Q} ->
- {Quota, State};
- {{value, MsgStatus}, Qa} ->
- %% TODO add QI batching here
- {MsgStatus1, State1} =
- maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, State1),
- State3 = Consumer(MsgStatus2, Qa, State2),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State3)
- end
+ true ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ false ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ {Quota, State#vqstate{index_state = IndexState1}};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State3)
+ end
end.
push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,