diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 41 |
1 files changed, 29 insertions, 12 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index e4b8fd5260..708f00e0b3 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1981,17 +1981,27 @@ push_alphas_to_betas(Quota, State) -> end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}. -push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, +push_alphas_to_betas(Generator, Consumer, Quota, Q, + State = #vqstate{ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}) -> + push_alphas_to_betas1(Generator, Consumer, Quota, Q, + CurrRamReadyCount, CurrRamBytes, + State). + +push_alphas_to_betas1(_Generator, _Consumer, Quota, _Q, + CurrRamReadyCount, CurrRamBytes, State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse - TargetRamCount >= RamMsgCount -> + TargetRamCount >= CurrRamReadyCount -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; -push_alphas_to_betas(Generator, Consumer, Quota, Q, + {Quota, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}; +push_alphas_to_betas1(Generator, Consumer, Quota, Q, + CurrRamReadyCount, CurrRamBytes, State = #vqstate{ index_state = IndexState, target_ram_count = TargetRamCount}) -> @@ -1999,23 +2009,30 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + {Quota, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}; false -> case Generator(Q) of {empty, _Q} -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1}}; + {Quota, State#vqstate{index_state = IndexState1, + ram_msg_count = CurrRamReadyCount, + ram_bytes = CurrRamBytes}}; {{value, MsgStatus}, Qa} -> {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - State2 = stats( - ready0, {MsgStatus, MsgStatus2}, State1), - State3 = Consumer(MsgStatus2, Qa, State2), - push_alphas_to_betas(Generator, Consumer, Quota - 1, - Qa, State3) + DeltaRam = delta_ram(msg_in_ram(MsgStatus), + msg_in_ram(MsgStatus2)), + DeltaRamReady = DeltaRam, + State2 = Consumer(MsgStatus2, Qa, State1), + push_alphas_to_betas1(Generator, Consumer, Quota - 1, + CurrRamReadyCount + DeltaRamReady, + CurrRamBytes + DeltaRam * msg_size(MsgStatus), + Qa, State2) end end. |
