summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-10-11 16:47:12 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-10-11 16:47:12 +0100
commit2e0752768e98f771aed71bd711805e2c57fa2b56 (patch)
treea9c25437687107a955f3d776c1a07c49e14411cc /src
parent3f3b79aa89a937f6c351cd29541060dc704ef37e (diff)
downloadrabbitmq-server-git-2e0752768e98f771aed71bd711805e2c57fa2b56.tar.gz
When under memory pressure, we want to limit the size of q2 and q3, not just of ?s. Simplify calculation of permitted length. Simplify (and correct) expansion of ?. We can now completely drop ram_index_count as we never care about just the number of ?s.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl286
2 files changed, 132 insertions, 161 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 606c4fe816..44c9b49905 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2189,12 +2189,7 @@ test_variable_queue_requeue(VQ0) ->
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = lists:foldl(
- fun (_N, {VQN, AckTags}) ->
- {{#basic_message{}, false, AckTag, _}, VQM} =
- rabbit_variable_queue:fetch(true, VQN),
- {VQM, [AckTag | AckTags]}
- end, {VQ2, []}, Seq),
+ {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
[Ack | Acc];
(_, Acc) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03004e102c..60c3dfd2d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -248,7 +248,6 @@
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
- ram_index_count,
out_counter,
in_counter,
rates,
@@ -336,7 +335,6 @@
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -490,7 +488,6 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
- ram_index_count = 0,
persistent_count = PCount1 })}.
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
@@ -718,6 +715,7 @@ needs_timeout(State) ->
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
+ fun null_gamma_delta/1,
State) of
{true, _State} -> idle;
{false, _State} -> false
@@ -725,6 +723,21 @@ needs_timeout(State) ->
true -> timed
end.
+null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
+ {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2),
+ fun (SeqId) -> SeqId end) orelse
+ null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3),
+ fun rabbit_queue_index:next_segment_boundary/1),
+ State}.
+
+null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1,
+ index_on_disk = true }}, _Q},
+ {{value, #msg_status { seq_id = SeqId2 }}, _Q2},
+ LimitFun) ->
+ SeqId1 >= LimitFun(SeqId2);
+null_gamma_delta_msg(_, _, _) ->
+ false.
+
timeout(State) ->
a(reduce_memory_use(confirm_commit_index(State))).
@@ -738,7 +751,6 @@ status(#vqstate {
ram_ack_index = RAI,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
rates = #rates { avg_egress = AvgEgressRate,
@@ -755,7 +767,6 @@ status(#vqstate {
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -774,10 +785,9 @@ discard(_Msg, _ChPid, State) -> State.
%%----------------------------------------------------------------------------
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }) ->
+ len = Len,
+ persistent_count = PersistentCount,
+ ram_msg_count = RamMsgCount }) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -793,7 +803,6 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
- true = RamIndexCount >= 0,
State.
@@ -910,44 +919,25 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
{Filtered, rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
-%% the first arg is the older delta
-combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
- ?BLANK_DELTA;
-combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = B) ->
- true = Start + Count =< End, %% ASSERTION
- B;
-combine_deltas(#delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
- true = Start + Count =< End, %% ASSERTION
- A;
-combine_deltas(#delta { start_seq_id = StartLow,
- count = CountLow,
- end_seq_id = EndLow },
- #delta { start_seq_id = StartHigh,
- count = CountHigh,
- end_seq_id = EndHigh }) ->
- Count = CountLow + CountHigh,
- true = (StartLow =< StartHigh) %% ASSERTIONS
- andalso ((StartLow + CountLow) =< EndLow)
- andalso ((StartHigh + CountHigh) =< EndHigh)
- andalso ((StartLow + Count) =< EndHigh),
- #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-
-expand_delta(SeqId, Delta) ->
- DeltaInc = #delta { start_seq_id = SeqId,
- count = 1,
- end_seq_id = SeqId + 1 },
- case Delta of
- ?BLANK_DELTA ->
- DeltaInc;
- #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId ->
- combine_deltas(DeltaInc, Delta);
- #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId ->
- combine_deltas(Delta, DeltaInc)
- end.
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
+ #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId < StartSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { start_seq_id = SeqId, count = Count + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId >= EndSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 };
+expand_delta(_SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta) ->
+ true = StartSeqId + Count + 1 =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1 }.
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -992,7 +982,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
- ram_index_count = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rate(Now, DeltaCount1),
@@ -1012,12 +1001,10 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
- true -> State #vqstate {
- q3 = ?QUEUE:in_r(MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
@@ -1335,14 +1322,12 @@ publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
-publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) ->
- {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+publish_beta(MsgStatus, State) ->
+ {#msg_status { msg = Msg} = MsgStatus1,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
{MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}.
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
@@ -1375,14 +1360,10 @@ delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk} = MsgStatus,
- State1} =
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, MsgPropsFun, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State1),
+ maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
@@ -1426,10 +1407,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1440,7 +1421,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
avg_egress = AvgAckEgress }
}) ->
- {Reduce, State1} =
+ {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
@@ -1461,15 +1442,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- %% AlphaBetaFun may have produced gammas that are bordering
- %% delta. We must ensure that we push these into delta, which is
- %% largely a no-op. This is why we call BetaDeltaFun even with a
- %% quota of 0.
- case chunk_size(State1 #vqstate.ram_index_count,
- permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, BetaDeltaFun(0, State1)}
- end.
+ {Reduce1, State3} =
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end,
+ {Reduce2, State4} = GammaDeltaFun(State3),
+ {Reduce1 orelse Reduce2, State4}.
limit_ram_acks(0, State) ->
{0, State};
@@ -1494,23 +1474,20 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
+ fun push_gammas_to_deltas/1,
State),
State1.
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
+permitted_beta_count(#vqstate { target_ram_count = 0 }) ->
+ rabbit_queue_index:next_segment_boundary(0);
permitted_beta_count(#vqstate { len = Len,
q1 = Q1,
- q3 = Q3,
q4 = Q4 }) ->
BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
- Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len),
- case ?QUEUE:out(Q3) of
- {empty, _Q3} -> Permitted;
- {{value, #msg_status { seq_id = MinSeqId }}, _Q3} ->
- lists:max([Permitted, rabbit_queue_index:next_segment_boundary(
- MinSeqId) - MinSeqId])
- end.
+ lists:max([BetaDeltaLen - ((BetaDeltaLen * BetaDeltaLen) div Len),
+ rabbit_queue_index:next_segment_boundary(0)]).
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1518,41 +1495,35 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_index_count = RamIndexCount}) ->
+fetch_from_q3(State = #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} ->
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- ram_index_count = RamIndexCount1 },
- State2 =
- case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and we
- %% know q4 is empty otherwise we wouldn't be
- %% loading from q3. As such, we can just set
- %% q4 to Q1.
- true = ?QUEUE:is_empty(Q2), %% ASSERTION
- true = ?QUEUE:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = ?QUEUE:new(),
- q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not touched
- %% delta, so the invariants between q1, q2,
- %% delta and q3 are maintained
- State1
- end,
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
{loaded, {MsgStatus, State2}}
end.
@@ -1639,42 +1610,35 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }} =
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
- ram_index_count = RamIndexCount1 },
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.
-push_betas_to_deltas(Quota,
- State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- PushState = {Quota, Delta, RamIndexCount, IndexState},
- {Q2a, PushState1} = push_betas_to_deltas(
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState }) ->
+ PushState = {Quota, Delta, IndexState},
+ {Q2a, PushState1} = push_with_limit(
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, PushState),
- {Q3a, PushState2} = push_betas_to_deltas(
+ Q2, fun push_betas_to_deltas1/4, PushState),
+ {Q3a, PushState2} = push_with_limit(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
- Q3, PushState1),
- {_, Delta1, RamIndexCount1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1,
- ram_index_count = RamIndexCount1 }.
-
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ Q3, fun push_betas_to_deltas1/4, PushState1),
+ {_, Delta1, IndexState1} = PushState2,
+ State #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
+
+push_with_limit(Generator, LimitFun, Q, PushFun, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
@@ -1684,16 +1648,15 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ false -> PushFun(Generator, Limit, Q, PushState)
end
end.
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _IndexState} = PushState) ->
+ {Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {0, Delta, RamIndexCount, IndexState}) ->
- {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta),
- {Qb, {0, Delta1, RamIndexCount, IndexState}};
-push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, RamIndexCount, IndexState} = PushState) ->
+ {Quota, Delta, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1702,20 +1665,33 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Q, PushState};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
- {Quota1, RamIndexCount1, IndexState1} =
+ {Quota1, IndexState1} =
case IndexOnDisk of
- true -> {Quota, RamIndexCount, IndexState};
+ true -> {Quota, IndexState};
false -> {#msg_status { index_on_disk = true },
IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,
IndexState),
- {Quota - 1, RamIndexCount - 1, IndexState2}
+ {Quota - 1, IndexState2}
end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota1, Delta1, RamIndexCount1, IndexState1})
+ {Quota1, Delta1, IndexState1})
end.
+push_gammas_to_deltas(State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3 }) ->
+ {Q2a, Delta1} = push_with_limit(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, fun push_gammas_to_deltas/4, Delta),
+ {Q3a, Delta2} = push_with_limit(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, fun push_gammas_to_deltas/4, Delta1),
+ {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}.
+
push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
case Generator(Q) of
{{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}