diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 138 |
1 files changed, 43 insertions, 95 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ee38f58e65..b37c8da91c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1828,39 +1828,30 @@ reduce_memory_use(State = #vqstate { State1 end. -limit_ram_acks(Quota, State = #vqstate{ram_bytes = CurrRamBytes}) -> - limit_ram_acks(Quota, CurrRamBytes, State). - -limit_ram_acks(0, CurrRamBytes, - State = #vqstate{index_state = IndexState, - target_ram_count = TargetRamCount}) -> +limit_ram_acks(0, State = #vqstate{ index_state = IndexState, + target_ram_count = TargetRamCount }) -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {0, State#vqstate{index_state = IndexState1, - ram_bytes = CurrRamBytes}}; -limit_ram_acks(Quota, CurrRamBytes, - State = #vqstate { - index_state = IndexState, - target_ram_count = TargetRamCount, - ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> + {0, State#vqstate{ index_state = IndexState1 }}; +limit_ram_acks(Quota, State = #vqstate { index_state = IndexState, + target_ram_count = TargetRamCount, + ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> case gb_trees:is_empty(RPA) of true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1, - ram_bytes = CurrRamBytes}}; + {Quota, State#vqstate{index_state = IndexState1}}; false -> {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA), {MsgStatus1, State1} = maybe_prepare_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA), - DeltaRam = delta_ram(msg_in_ram(MsgStatus), msg_in_ram(MsgStatus2)), limit_ram_acks(Quota - 1, - CurrRamBytes + DeltaRam * msg_size(MsgStatus), - State1 #vqstate { ram_pending_ack = RPA1, - disk_pending_ack = DPA1 }) + stats({0, 0}, {MsgStatus, MsgStatus2}, + State1 #vqstate { ram_pending_ack = RPA1, + disk_pending_ack = DPA1 })) end. permitted_beta_count(#vqstate { len = 0 }) -> @@ -1986,59 +1977,40 @@ push_alphas_to_betas(Quota, State) -> end, Quota1, State1 #vqstate.q4, State1), {Quota2, State2}. -push_alphas_to_betas(Generator, Consumer, Quota, Q, - State = #vqstate{ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}) -> - push_alphas_to_betas1(Generator, Consumer, Quota, Q, - CurrRamReadyCount, CurrRamBytes, - State). - -push_alphas_to_betas1(_Generator, _Consumer, Quota, _Q, - CurrRamReadyCount, CurrRamBytes, - State = #vqstate { index_state = IndexState, +push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, + State = #vqstate { ram_msg_count = RamMsgCount, + index_state = IndexState, target_ram_count = TargetRamCount }) when Quota =:= 0 orelse TargetRamCount =:= infinity orelse - TargetRamCount >= CurrRamReadyCount -> + TargetRamCount >= RamMsgCount -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}; -push_alphas_to_betas1(Generator, Consumer, Quota, Q, - CurrRamReadyCount, CurrRamBytes, + {Quota, State#vqstate{index_state = IndexState1}}; +push_alphas_to_betas(Generator, Consumer, Quota, Q, State = #vqstate{ index_state = IndexState, target_ram_count = TargetRamCount}) -> case credit_flow:blocked() of - true -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}; - false -> - case Generator(Q) of - {empty, _Q} -> - IndexState1 = rabbit_queue_index:flush_pre_publish_cache( - TargetRamCount, IndexState), - {Quota, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}; - {{value, MsgStatus}, Qa} -> - {MsgStatus1, State1} = - maybe_prepare_write_to_disk(true, false, MsgStatus, - State), - MsgStatus2 = m(trim_msg_status(MsgStatus1)), - DeltaRam = delta_ram(msg_in_ram(MsgStatus), - msg_in_ram(MsgStatus2)), - DeltaRamReady = DeltaRam, - State2 = Consumer(MsgStatus2, Qa, State1), - push_alphas_to_betas1(Generator, Consumer, Quota - 1, Qa, - CurrRamReadyCount + DeltaRamReady, - CurrRamBytes + DeltaRam * msg_size(MsgStatus), - State2) - end + true -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; + false -> case Generator(Q) of + {empty, _Q} -> + IndexState1 = rabbit_queue_index:flush_pre_publish_cache( + TargetRamCount, IndexState), + {Quota, State#vqstate{index_state = IndexState1}}; + {{value, MsgStatus}, Qa} -> + {MsgStatus1, State1} = + maybe_prepare_write_to_disk(true, false, MsgStatus, + State), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + State2 = stats( + ready0, {MsgStatus, MsgStatus2}, State1), + State3 = Consumer(MsgStatus2, Qa, State2), + push_alphas_to_betas(Generator, Consumer, Quota - 1, + Qa, State3) + end end. push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, @@ -2058,11 +2030,7 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, delta = Delta1, q3 = Q3a }. -push_betas_to_deltas(Generator, LimitFun, Q, - {_Quota, _Delta, - #vqstate{ - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}} = PushState) -> +push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> {Q, PushState}; @@ -2072,24 +2040,18 @@ push_betas_to_deltas(Generator, LimitFun, Q, Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {Q, PushState}; - false -> push_betas_to_deltas1(Generator, Limit, Q, - CurrRamReadyCount, CurrRamBytes, - PushState) + false -> push_betas_to_deltas1(Generator, Limit, Q, PushState) end end. push_betas_to_deltas1(_Generator, _Limit, Q, - CurrRamReadyCount, CurrRamBytes, {0, Delta, State = #vqstate{index_state = IndexState, target_ram_count = TargetRamCount}}) -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Q, {0, Delta, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}}; + {Q, {0, Delta, State#vqstate{index_state = IndexState1}}}; push_betas_to_deltas1(Generator, Limit, Q, - CurrRamReadyCount, CurrRamBytes, {Quota, Delta, State = #vqstate{index_state = IndexState, target_ram_count = TargetRamCount}}) -> @@ -2097,35 +2059,21 @@ push_betas_to_deltas1(Generator, Limit, Q, {empty, _Q} -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Q, {Quota, Delta, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}}; + {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}}; {{value, #msg_status { seq_id = SeqId }}, _Qa} when SeqId < Limit -> IndexState1 = rabbit_queue_index:flush_pre_publish_cache( TargetRamCount, IndexState), - {Q, {Quota, Delta, State#vqstate{index_state = IndexState1, - ram_msg_count = CurrRamReadyCount, - ram_bytes = CurrRamBytes}}}; + {Q, {Quota, Delta, State#vqstate{index_state = IndexState1}}}; {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} -> {#msg_status { index_on_disk = true }, State1} = maybe_batch_write_index_to_disk(true, MsgStatus, State), - {Size, DeltaRam} = size_and_delta_ram(MsgStatus), + State2 = stats(ready0, {MsgStatus, none}, State1), Delta1 = expand_delta(SeqId, Delta), push_betas_to_deltas1(Generator, Limit, Qa, - CurrRamReadyCount + DeltaRam, - CurrRamBytes + DeltaRam * Size, - {Quota - 1, Delta1, State1}) + {Quota - 1, Delta1, State2}) end. -%% Optimised version for paging only, based on stats/3 being called -%% like this: stats(ready0, {MsgStatus, none}, State1). -size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}, - msg = undefined}) -> - {Size, 0}; -size_and_delta_ram(#msg_status{msg_props = #message_properties{size = Size}}) -> - {Size, -1}. - %%---------------------------------------------------------------------------- %% Upgrading %%---------------------------------------------------------------------------- |
