summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl41
1 files changed, 29 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index e4b8fd5260..708f00e0b3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1981,17 +1981,27 @@ push_alphas_to_betas(Quota, State) ->
end, Quota1, State1 #vqstate.q4, State1),
{Quota2, State2}.
-push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+push_alphas_to_betas(Generator, Consumer, Quota, Q,
+ State = #vqstate{ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}) ->
+ push_alphas_to_betas1(Generator, Consumer, Quota, Q,
+ CurrRamReadyCount, CurrRamBytes,
+ State).
+
+push_alphas_to_betas1(_Generator, _Consumer, Quota, _Q,
+ CurrRamReadyCount, CurrRamBytes,
State = #vqstate { index_state = IndexState,
- ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount })
when Quota =:= 0 orelse
TargetRamCount =:= infinity orelse
- TargetRamCount >= RamMsgCount ->
+ TargetRamCount >= CurrRamReadyCount ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
-push_alphas_to_betas(Generator, Consumer, Quota, Q,
+ {Quota, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}};
+push_alphas_to_betas1(Generator, Consumer, Quota, Q,
+ CurrRamReadyCount, CurrRamBytes,
State = #vqstate{
index_state = IndexState,
target_ram_count = TargetRamCount}) ->
@@ -1999,23 +2009,30 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q,
true ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
+ {Quota, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}};
false ->
case Generator(Q) of
{empty, _Q} ->
IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
TargetRamCount, IndexState),
- {Quota, State#vqstate{index_state = IndexState1}};
+ {Quota, State#vqstate{index_state = IndexState1,
+ ram_msg_count = CurrRamReadyCount,
+ ram_bytes = CurrRamBytes}};
{{value, MsgStatus}, Qa} ->
{MsgStatus1, State1} =
maybe_prepare_write_to_disk(true, false, MsgStatus,
State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- State2 = stats(
- ready0, {MsgStatus, MsgStatus2}, State1),
- State3 = Consumer(MsgStatus2, Qa, State2),
- push_alphas_to_betas(Generator, Consumer, Quota - 1,
- Qa, State3)
+ DeltaRam = delta_ram(msg_in_ram(MsgStatus),
+ msg_in_ram(MsgStatus2)),
+ DeltaRamReady = DeltaRam,
+ State2 = Consumer(MsgStatus2, Qa, State1),
+ push_alphas_to_betas1(Generator, Consumer, Quota - 1,
+ CurrRamReadyCount + DeltaRamReady,
+ CurrRamBytes + DeltaRam * msg_size(MsgStatus),
+ Qa, State2)
end
end.