diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-18 19:30:38 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-18 19:30:38 +0100 |
| commit | f05592fc3f4821f57f505de0582b8b27625f639a (patch) | |
| tree | 471d9fd1864405a6e64d519d3b08e48f686f50fb | |
| parent | 82e0ad06425e2ed16bcc8b2e0c7c472626faef2f (diff) | |
| download | rabbitmq-server-git-f05592fc3f4821f57f505de0582b8b27625f639a.tar.gz | |
yet more refactoring of push_betas_to_deltas
finally make the q2 and q3 processing use the same code
| -rw-r--r-- | src/rabbit_variable_queue.erl | 99 |
1 files changed, 36 insertions, 63 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 02325351fc..cfea41f0b9 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1360,81 +1360,54 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, ram_index_count = RamIndexCount, index_state = IndexState }) -> {Delta1, Q2a, RamIndexCount1, IndexState1} = - case beta_bounds(Q2) of - empty -> - {?BLANK_DELTA, Q2, RamIndexCount, IndexState}; - {Q2MinSeqId, Q2MaxSeqId} -> - {Len1, Q2b, RamIndexCount2, IndexState2} = - push_betas_to_deltas( - fun bpqueue:out/1, undefined, Q2, 0, RamIndexCount, - IndexState), - %% Q2MinSeqId is high in the sense that it must be - %% higher than the seq_id in Delta, but it's also the - %% lowest of the betas that we transfer from q2 to - %% delta. - {#delta { start_seq_id = Q2MinSeqId, - count = Len1, - end_seq_id = Q2MaxSeqId + 1 }, - Q2b, RamIndexCount2, IndexState2} - end, - true = bpqueue:is_empty(Q2a), %% ASSERTION - Delta2 = #delta { start_seq_id = Delta2SeqId } = - combine_deltas(Delta, Delta1), - {Delta3, Q3a, RamIndexCount3, IndexState3} = - case beta_bounds(Q3) of - empty -> - {?BLANK_DELTA, Q3, RamIndexCount1, IndexState1}; - {Q3MinSeqId, Q3MaxSeqId} -> - Limit = rabbit_queue_index:next_segment_boundary(Q3MinSeqId), - %% ASSERTION - true = Delta2SeqId == undefined orelse Delta2SeqId > Q3MaxSeqId, - case Q3MaxSeqId < Limit of - true -> - %% already only holding LTE one segment indices in q3 - {?BLANK_DELTA, Q3, RamIndexCount1, IndexState1}; - false -> - %% Q3MaxSeqId is low in the sense that it must - %% be lower than the seq_id in delta2, in fact - %% either delta2 has undefined as its seq_id - %% or there does not exist a seq_id X s.t. X > - %% Q3MaxSeqId and X < delta2's seq_id (would - %% be +1 if it wasn't for the possibility of - %% gaps in the seq_ids). But Q3MaxSeqId is - %% actually also the highest seq_id of the - %% betas we transfer from q3 to deltas. - {Len2, Q3b, RamIndexCount4, IndexState4} = - push_betas_to_deltas( - fun bpqueue:out_r/1, Limit, Q3, 0, RamIndexCount1, - IndexState1), - {#delta { start_seq_id = Limit, - count = Len2, - end_seq_id = Q3MaxSeqId + 1 }, - Q3b, RamIndexCount4, IndexState4} - end - end, + push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end, + fun bpqueue:out/1, Q2, + RamIndexCount, IndexState), + {Delta2, Q3a, RamIndexCount3, IndexState3} = + push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1, + fun bpqueue:out_r/1, Q3, + RamIndexCount1, IndexState1), + Delta3 = combine_deltas(Delta2, combine_deltas(Delta, Delta1)), State #vqstate { q2 = Q2a, - delta = combine_deltas(Delta3, Delta2), + delta = Delta3, q3 = Q3a, index_state = IndexState3, ram_index_count = RamIndexCount3 }. +push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) -> + case beta_bounds(Q) of + empty -> + {?BLANK_DELTA, Q, RamIndexCount, IndexState}; + {MinSeqId, MaxSeqId} -> + Limit = LimitFun(MinSeqId), + case MaxSeqId < Limit of + true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState}; + false -> {Len, Qb, RamIndexCount1, IndexState1} = + push_betas_to_deltas(Generator, Limit, Q, 0, + RamIndexCount, IndexState), + {#delta { start_seq_id = Limit, + count = Len, + end_seq_id = MaxSeqId + 1 }, + Qb, RamIndexCount1, IndexState1} + end + end. + push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of - {empty, Qa} -> - {Count, Qa, RamIndexCount, IndexState}; + {empty, Q} -> + {Count, Q, RamIndexCount, IndexState}; {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} - when Limit =/= undefined andalso SeqId < Limit -> + when SeqId < Limit -> {Count, Q, RamIndexCount, IndexState}; {{value, IndexOnDisk, MsgStatus}, Qa} -> {RamIndexCount1, IndexState1} = case IndexOnDisk of - true -> - {RamIndexCount, IndexState}; - false -> - {#msg_status { index_on_disk = true }, IndexState2} = - maybe_write_index_to_disk(true, MsgStatus, - IndexState), - {RamIndexCount - 1, IndexState2} + true -> {RamIndexCount, IndexState}; + false -> {#msg_status { index_on_disk = true }, + IndexState2} = + maybe_write_index_to_disk(true, MsgStatus, + IndexState), + {RamIndexCount - 1, IndexState2} end, push_betas_to_deltas( Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1) |
