diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
1 files changed, 7 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index cfea41f0b9..c0c7c69e9a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -772,15 +772,6 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> persistent_guids(Pubs) -> [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. -beta_bounds(Q) -> - case bpqueue:out(Q) of - {empty, Q} -> empty; - {{value, _IndexOnDisk1, #msg_status { seq_id = SeqIdMin }}, _Qa} -> - {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Qb} = - bpqueue:out_r(Q), - {SeqIdMin, SeqIdMax} - end. - betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( @@ -1375,20 +1366,22 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, ram_index_count = RamIndexCount3 }. push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> - case beta_bounds(Q) of - empty -> + case bpqueue:out(Q) of + {empty, Q} -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - {MinSeqId, MaxSeqId} -> + {{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} -> + {{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} = + bpqueue:out_r(Q), Limit = LimitFun(MinSeqId), case MaxSeqId < Limit of true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; - false -> {Len, Qb, RamIndexCount1, IndexState1} = + false -> {Len, Qc, RamIndexCount1, IndexState1} = push_betas_to_deltas(Generator, Limit, Q, 0, RamIndexCount, IndexState), {#delta { start_seq_id = Limit, count = Len, end_seq_id = MaxSeqId + 1 }, - Qb, RamIndexCount1, IndexState1} + Qc, RamIndexCount1, IndexState1} end end. |
