diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-18 17:08:55 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-18 17:08:55 +0100 |
| commit | 8f124ae4c1fb62baa0336a32eae43005d4fc4a85 (patch) | |
| tree | e6ec360528843e24a9f3e234bd8b82a179320197 | |
| parent | 8b2008b3a6fdf6b2110abd582b6ee64d99a370b0 (diff) | |
| download | rabbitmq-server-git-8f124ae4c1fb62baa0336a32eae43005d4fc4a85.tar.gz | |
refactor push_betas_to_deltas
to make the similarity of the treatment of Q2 and Q3 more obvious
| -rw-r--r-- | src/rabbit_variable_queue.erl | 112 |
1 files changed, 57 insertions, 55 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 977b1af21b..b4fd05d9e5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -772,6 +772,15 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> persistent_guids(Pubs) -> [Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs]. +beta_bounds(Q) -> + case bpqueue:out(Q) of + {empty, Q} -> empty; + {{value, _IndexOnDisk1, #msg_status { seq_id = SeqIdMin }}, _Qa} -> + {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Qb} = + bpqueue:out_r(Q), + {SeqIdMin, SeqIdMax} + end. + betas_from_index_entries(List, TransientThreshold, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( @@ -1350,74 +1359,67 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, q3 = Q3, ram_index_count = RamIndexCount, index_state = IndexState }) -> - %% HighSeqId is high in the sense that it must be higher than the - %% seq_id in Delta, but it's also the lowest of the betas that we - %% transfer from q2 to delta. - {HighSeqId, Len1, Q2a, RamIndexCount1, IndexState1} = - push_betas_to_deltas( - fun bpqueue:out/1, undefined, Q2, RamIndexCount, IndexState), - true = bpqueue:is_empty(Q2a), %% ASSERTION - EndSeqId = - case bpqueue:out_r(Q2) of - {empty, _Q2} -> - undefined; - {{value, _IndexOnDisk, #msg_status { seq_id = EndSeqId1 }}, _Q2} -> - EndSeqId1 + 1 + {Delta1, Q2a, RamIndexCount1, IndexState1} = + case beta_bounds(Q2) of + empty -> + {?BLANK_DELTA, Q2, RamIndexCount, IndexState}; + {Q2MinSeqId, Q2MaxSeqId} -> + {Q2Count, Q2b, RamIndexCount2, IndexState2} = + push_betas_to_deltas( + fun bpqueue:out/1, undefined, Q2, 0, RamIndexCount, + IndexState), + %% Q2MinSeqId is high in the sense that it must be + %% higher than the seq_id in Delta, but it's also the + %% lowest of the betas that we transfer from q2 to + %% delta. + {#delta { start_seq_id = Q2MinSeqId, + count = Q2Count, + end_seq_id = Q2MaxSeqId + 1 }, + Q2b, RamIndexCount2, IndexState2} end, - Delta1 = #delta { start_seq_id = Delta1SeqId } = - combine_deltas(Delta, #delta { start_seq_id = HighSeqId, - count = Len1, - end_seq_id = EndSeqId }), - State1 = State #vqstate { q2 = bpqueue:new(), - delta = Delta1, + true = bpqueue:is_empty(Q2a), %% ASSERTION + Delta2 = #delta { start_seq_id = Delta2SeqId } = + combine_deltas(Delta, Delta1), + State1 = State #vqstate { q2 = Q2a, + delta = Delta2, index_state = IndexState1, ram_index_count = RamIndexCount1 }, - case bpqueue:out(Q3) of - {empty, _Q3} -> + + case beta_bounds(Q3) of + empty -> State1; - {{value, _IndexOnDisk1, #msg_status { seq_id = SeqId }}, _Q3} -> - {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Q3a} = - bpqueue:out_r(Q3), - Limit = rabbit_queue_index:next_segment_boundary(SeqId), + {Q3MinSeqId, Q3MaxSeqId} -> + Limit = rabbit_queue_index:next_segment_boundary(Q3MinSeqId), %% ASSERTION - true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax, - case SeqIdMax < Limit of + true = Delta2SeqId == undefined orelse Delta2SeqId > Q3MaxSeqId, + case Q3MaxSeqId < Limit of true -> %% already only holding LTE one segment indices in q3 State1; false -> - %% SeqIdMax is low in the sense that it must be - %% lower than the seq_id in delta1, in fact either - %% delta1 has undefined as its seq_id or there - %% does not exist a seq_id X s.t. X > SeqIdMax and - %% X < delta1's seq_id (would be +1 if it wasn't - %% for the possibility of gaps in the seq_ids). - %% But because we use queue:out_r, SeqIdMax is - %% actually also the highest seq_id of the betas we - %% transfer from q3 to deltas. - {SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} = - push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3, - RamIndexCount1, IndexState1), - Delta2 = #delta { start_seq_id = Limit, + %% Q3MaxSeqId is low in the sense that it must be + %% lower than the seq_id in delta2, in fact either + %% delta2 has undefined as its seq_id or there + %% does not exist a seq_id X s.t. X > Q3MaxSeqId + %% and X < delta2's seq_id (would be +1 if it + %% wasn't for the possibility of gaps in the + %% seq_ids). But Q3MaxSeqId is actually also the + %% highest seq_id of the betas we transfer from q3 + %% to deltas. + {Len2, Q3a, RamIndexCount3, IndexState3} = + push_betas_to_deltas( + fun bpqueue:out_r/1, Limit, Q3, 0, RamIndexCount1, + IndexState1), + Delta3 = #delta { start_seq_id = Limit, count = Len2, - end_seq_id = SeqIdMax + 1 }, - Delta3 = combine_deltas(Delta2, Delta1), - State1 #vqstate { delta = Delta3, + end_seq_id = Q3MaxSeqId + 1 }, + Delta4 = combine_deltas(Delta3, Delta2), + State1 #vqstate { delta = Delta4, q3 = Q3a, - index_state = IndexState2, - ram_index_count = RamIndexCount2 } + index_state = IndexState3, + ram_index_count = RamIndexCount3 } end end. -push_betas_to_deltas(Generator, Limit, Q, RamIndexCount, IndexState) -> - case Generator(Q) of - {empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState}; - {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} -> - {Count, Qb, RamIndexCount1, IndexState1} = - push_betas_to_deltas( - Generator, Limit, Q, 0, RamIndexCount, IndexState), - {SeqId, Count, Qb, RamIndexCount1, IndexState1} - end. - push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of {empty, Qa} -> |
