summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-18 17:08:55 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-18 17:08:55 +0100
commit8f124ae4c1fb62baa0336a32eae43005d4fc4a85 (patch)
treee6ec360528843e24a9f3e234bd8b82a179320197
parent8b2008b3a6fdf6b2110abd582b6ee64d99a370b0 (diff)
downloadrabbitmq-server-git-8f124ae4c1fb62baa0336a32eae43005d4fc4a85.tar.gz
refactor push_betas_to_deltas
to make the similarity of the treatment of Q2 and Q3 more obvious
-rw-r--r--src/rabbit_variable_queue.erl112
1 files changed, 57 insertions, 55 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 977b1af21b..b4fd05d9e5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -772,6 +772,15 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
persistent_guids(Pubs) ->
[Guid || #basic_message { guid = Guid, is_persistent = true } <- Pubs].
+beta_bounds(Q) ->
+ case bpqueue:out(Q) of
+ {empty, Q} -> empty;
+ {{value, _IndexOnDisk1, #msg_status { seq_id = SeqIdMin }}, _Qa} ->
+ {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Qb} =
+ bpqueue:out_r(Q),
+ {SeqIdMin, SeqIdMax}
+ end.
+
betas_from_index_entries(List, TransientThreshold, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
@@ -1350,74 +1359,67 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
q3 = Q3,
ram_index_count = RamIndexCount,
index_state = IndexState }) ->
- %% HighSeqId is high in the sense that it must be higher than the
- %% seq_id in Delta, but it's also the lowest of the betas that we
- %% transfer from q2 to delta.
- {HighSeqId, Len1, Q2a, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(
- fun bpqueue:out/1, undefined, Q2, RamIndexCount, IndexState),
- true = bpqueue:is_empty(Q2a), %% ASSERTION
- EndSeqId =
- case bpqueue:out_r(Q2) of
- {empty, _Q2} ->
- undefined;
- {{value, _IndexOnDisk, #msg_status { seq_id = EndSeqId1 }}, _Q2} ->
- EndSeqId1 + 1
+ {Delta1, Q2a, RamIndexCount1, IndexState1} =
+ case beta_bounds(Q2) of
+ empty ->
+ {?BLANK_DELTA, Q2, RamIndexCount, IndexState};
+ {Q2MinSeqId, Q2MaxSeqId} ->
+ {Q2Count, Q2b, RamIndexCount2, IndexState2} =
+ push_betas_to_deltas(
+ fun bpqueue:out/1, undefined, Q2, 0, RamIndexCount,
+ IndexState),
+ %% Q2MinSeqId is high in the sense that it must be
+ %% higher than the seq_id in Delta, but it's also the
+ %% lowest of the betas that we transfer from q2 to
+ %% delta.
+ {#delta { start_seq_id = Q2MinSeqId,
+ count = Q2Count,
+ end_seq_id = Q2MaxSeqId + 1 },
+ Q2b, RamIndexCount2, IndexState2}
end,
- Delta1 = #delta { start_seq_id = Delta1SeqId } =
- combine_deltas(Delta, #delta { start_seq_id = HighSeqId,
- count = Len1,
- end_seq_id = EndSeqId }),
- State1 = State #vqstate { q2 = bpqueue:new(),
- delta = Delta1,
+ true = bpqueue:is_empty(Q2a), %% ASSERTION
+ Delta2 = #delta { start_seq_id = Delta2SeqId } =
+ combine_deltas(Delta, Delta1),
+ State1 = State #vqstate { q2 = Q2a,
+ delta = Delta2,
index_state = IndexState1,
ram_index_count = RamIndexCount1 },
- case bpqueue:out(Q3) of
- {empty, _Q3} ->
+
+ case beta_bounds(Q3) of
+ empty ->
State1;
- {{value, _IndexOnDisk1, #msg_status { seq_id = SeqId }}, _Q3} ->
- {{value, _IndexOnDisk2, #msg_status { seq_id = SeqIdMax }}, _Q3a} =
- bpqueue:out_r(Q3),
- Limit = rabbit_queue_index:next_segment_boundary(SeqId),
+ {Q3MinSeqId, Q3MaxSeqId} ->
+ Limit = rabbit_queue_index:next_segment_boundary(Q3MinSeqId),
%% ASSERTION
- true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax,
- case SeqIdMax < Limit of
+ true = Delta2SeqId == undefined orelse Delta2SeqId > Q3MaxSeqId,
+ case Q3MaxSeqId < Limit of
true -> %% already only holding LTE one segment indices in q3
State1;
false ->
- %% SeqIdMax is low in the sense that it must be
- %% lower than the seq_id in delta1, in fact either
- %% delta1 has undefined as its seq_id or there
- %% does not exist a seq_id X s.t. X > SeqIdMax and
- %% X < delta1's seq_id (would be +1 if it wasn't
- %% for the possibility of gaps in the seq_ids).
- %% But because we use queue:out_r, SeqIdMax is
- %% actually also the highest seq_id of the betas we
- %% transfer from q3 to deltas.
- {SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3,
- RamIndexCount1, IndexState1),
- Delta2 = #delta { start_seq_id = Limit,
+ %% Q3MaxSeqId is low in the sense that it must be
+ %% lower than the seq_id in delta2, in fact either
+ %% delta2 has undefined as its seq_id or there
+ %% does not exist a seq_id X s.t. X > Q3MaxSeqId
+ %% and X < delta2's seq_id (would be +1 if it
+ %% wasn't for the possibility of gaps in the
+ %% seq_ids). But Q3MaxSeqId is actually also the
+ %% highest seq_id of the betas we transfer from q3
+ %% to deltas.
+ {Len2, Q3a, RamIndexCount3, IndexState3} =
+ push_betas_to_deltas(
+ fun bpqueue:out_r/1, Limit, Q3, 0, RamIndexCount1,
+ IndexState1),
+ Delta3 = #delta { start_seq_id = Limit,
count = Len2,
- end_seq_id = SeqIdMax + 1 },
- Delta3 = combine_deltas(Delta2, Delta1),
- State1 #vqstate { delta = Delta3,
+ end_seq_id = Q3MaxSeqId + 1 },
+ Delta4 = combine_deltas(Delta3, Delta2),
+ State1 #vqstate { delta = Delta4,
q3 = Q3a,
- index_state = IndexState2,
- ram_index_count = RamIndexCount2 }
+ index_state = IndexState3,
+ ram_index_count = RamIndexCount3 }
end
end.
-push_betas_to_deltas(Generator, Limit, Q, RamIndexCount, IndexState) ->
- case Generator(Q) of
- {empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState};
- {{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa} ->
- {Count, Qb, RamIndexCount1, IndexState1} =
- push_betas_to_deltas(
- Generator, Limit, Q, 0, RamIndexCount, IndexState),
- {SeqId, Count, Qb, RamIndexCount1, IndexState1}
- end.
-
push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, Qa} ->