summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-18 19:30:38 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-18 19:30:38 +0100
commitf05592fc3f4821f57f505de0582b8b27625f639a (patch)
tree471d9fd1864405a6e64d519d3b08e48f686f50fb
parent82e0ad06425e2ed16bcc8b2e0c7c472626faef2f (diff)
downloadrabbitmq-server-git-f05592fc3f4821f57f505de0582b8b27625f639a.tar.gz
yet more refactoring of push_betas_to_deltas
finally make the q2 and q3 processing use the same code
-rw-r--r--src/rabbit_variable_queue.erl99
1 files changed, 36 insertions, 63 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 02325351fc..cfea41f0b9 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1360,81 +1360,54 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
ram_index_count = RamIndexCount,
index_state = IndexState }) ->
{Delta1, Q2a, RamIndexCount1, IndexState1} =
- case beta_bounds(Q2) of
- empty ->
- {?BLANK_DELTA, Q2, RamIndexCount, IndexState};
- {Q2MinSeqId, Q2MaxSeqId} ->
- {Len1, Q2b, RamIndexCount2, IndexState2} =
- push_betas_to_deltas(
- fun bpqueue:out/1, undefined, Q2, 0, RamIndexCount,
- IndexState),
- %% Q2MinSeqId is high in the sense that it must be
- %% higher than the seq_id in Delta, but it's also the
- %% lowest of the betas that we transfer from q2 to
- %% delta.
- {#delta { start_seq_id = Q2MinSeqId,
- count = Len1,
- end_seq_id = Q2MaxSeqId + 1 },
- Q2b, RamIndexCount2, IndexState2}
- end,
- true = bpqueue:is_empty(Q2a), %% ASSERTION
- Delta2 = #delta { start_seq_id = Delta2SeqId } =
- combine_deltas(Delta, Delta1),
- {Delta3, Q3a, RamIndexCount3, IndexState3} =
- case beta_bounds(Q3) of
- empty ->
- {?BLANK_DELTA, Q3, RamIndexCount1, IndexState1};
- {Q3MinSeqId, Q3MaxSeqId} ->
- Limit = rabbit_queue_index:next_segment_boundary(Q3MinSeqId),
- %% ASSERTION
- true = Delta2SeqId == undefined orelse Delta2SeqId > Q3MaxSeqId,
- case Q3MaxSeqId < Limit of
- true ->
- %% already only holding LTE one segment indices in q3
- {?BLANK_DELTA, Q3, RamIndexCount1, IndexState1};
- false ->
- %% Q3MaxSeqId is low in the sense that it must
- %% be lower than the seq_id in delta2, in fact
- %% either delta2 has undefined as its seq_id
- %% or there does not exist a seq_id X s.t. X >
- %% Q3MaxSeqId and X < delta2's seq_id (would
- %% be +1 if it wasn't for the possibility of
- %% gaps in the seq_ids). But Q3MaxSeqId is
- %% actually also the highest seq_id of the
- %% betas we transfer from q3 to deltas.
- {Len2, Q3b, RamIndexCount4, IndexState4} =
- push_betas_to_deltas(
- fun bpqueue:out_r/1, Limit, Q3, 0, RamIndexCount1,
- IndexState1),
- {#delta { start_seq_id = Limit,
- count = Len2,
- end_seq_id = Q3MaxSeqId + 1 },
- Q3b, RamIndexCount4, IndexState4}
- end
- end,
+ push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
+ fun bpqueue:out/1, Q2,
+ RamIndexCount, IndexState),
+ {Delta2, Q3a, RamIndexCount3, IndexState3} =
+ push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
+ fun bpqueue:out_r/1, Q3,
+ RamIndexCount1, IndexState1),
+ Delta3 = combine_deltas(Delta2, combine_deltas(Delta, Delta1)),
State #vqstate { q2 = Q2a,
- delta = combine_deltas(Delta3, Delta2),
+ delta = Delta3,
q3 = Q3a,
index_state = IndexState3,
ram_index_count = RamIndexCount3 }.
+push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
+ case beta_bounds(Q) of
+ empty ->
+ {?BLANK_DELTA, Q, RamIndexCount, IndexState};
+ {MinSeqId, MaxSeqId} ->
+ Limit = LimitFun(MinSeqId),
+ case MaxSeqId < Limit of
+ true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
+ false -> {Len, Qb, RamIndexCount1, IndexState1} =
+ push_betas_to_deltas(Generator, Limit, Q, 0,
+ RamIndexCount, IndexState),
+ {#delta { start_seq_id = Limit,
+ count = Len,
+ end_seq_id = MaxSeqId + 1 },
+ Qb, RamIndexCount1, IndexState1}
+ end
+ end.
+
push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
- {empty, Qa} ->
- {Count, Qa, RamIndexCount, IndexState};
+ {empty, Q} ->
+ {Count, Q, RamIndexCount, IndexState};
{{value, _IndexOnDisk, #msg_status { seq_id = SeqId }}, _Qa}
- when Limit =/= undefined andalso SeqId < Limit ->
+ when SeqId < Limit ->
{Count, Q, RamIndexCount, IndexState};
{{value, IndexOnDisk, MsgStatus}, Qa} ->
{RamIndexCount1, IndexState1} =
case IndexOnDisk of
- true ->
- {RamIndexCount, IndexState};
- false ->
- {#msg_status { index_on_disk = true }, IndexState2} =
- maybe_write_index_to_disk(true, MsgStatus,
- IndexState),
- {RamIndexCount - 1, IndexState2}
+ true -> {RamIndexCount, IndexState};
+ false -> {#msg_status { index_on_disk = true },
+ IndexState2} =
+ maybe_write_index_to_disk(true, MsgStatus,
+ IndexState),
+ {RamIndexCount - 1, IndexState2}
end,
push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)