summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl7
-rw-r--r--src/rabbit_variable_queue.erl286
2 files changed, 132 insertions, 161 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 606c4fe816..44c9b49905 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2189,12 +2189,7 @@ test_variable_queue_requeue(VQ0) ->
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = lists:foldl(
- fun (_N, {VQN, AckTags}) ->
- {{#basic_message{}, false, AckTag, _}, VQM} =
- rabbit_variable_queue:fetch(true, VQN),
- {VQM, [AckTag | AckTags]}
- end, {VQ2, []}, Seq),
+ {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
[Ack | Acc];
(_, Acc) ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 03004e102c..60c3dfd2d3 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -248,7 +248,6 @@
ram_msg_count,
ram_msg_count_prev,
ram_ack_count_prev,
- ram_index_count,
out_counter,
in_counter,
rates,
@@ -336,7 +335,6 @@
target_ram_count :: non_neg_integer() | 'infinity',
ram_msg_count :: non_neg_integer(),
ram_msg_count_prev :: non_neg_integer(),
- ram_index_count :: non_neg_integer(),
out_counter :: non_neg_integer(),
in_counter :: non_neg_integer(),
rates :: rates(),
@@ -490,7 +488,6 @@ purge(State = #vqstate { q4 = Q4,
index_state = IndexState3,
len = 0,
ram_msg_count = 0,
- ram_index_count = 0,
persistent_count = PCount1 })}.
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
@@ -718,6 +715,7 @@ needs_timeout(State) ->
fun (_Quota, State1) -> {0, State1} end,
fun (_Quota, State1) -> State1 end,
fun (_Quota, State1) -> {0, State1} end,
+ fun null_gamma_delta/1,
State) of
{true, _State} -> idle;
{false, _State} -> false
@@ -725,6 +723,21 @@ needs_timeout(State) ->
true -> timed
end.
+null_gamma_delta(#vqstate { q2 = Q2, q3 = Q3 } = State) ->
+ {null_gamma_delta_msg(?QUEUE:out(Q2), ?QUEUE:out(Q2),
+ fun (SeqId) -> SeqId end) orelse
+ null_gamma_delta_msg(?QUEUE:out_r(Q3), ?QUEUE:out(Q3),
+ fun rabbit_queue_index:next_segment_boundary/1),
+ State}.
+
+null_gamma_delta_msg({{value, #msg_status { seq_id = SeqId1,
+ index_on_disk = true }}, _Q},
+ {{value, #msg_status { seq_id = SeqId2 }}, _Q2},
+ LimitFun) ->
+ SeqId1 >= LimitFun(SeqId2);
+null_gamma_delta_msg(_, _, _) ->
+ false.
+
timeout(State) ->
a(reduce_memory_use(confirm_commit_index(State))).
@@ -738,7 +751,6 @@ status(#vqstate {
ram_ack_index = RAI,
target_ram_count = TargetRamCount,
ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
next_seq_id = NextSeqId,
persistent_count = PersistentCount,
rates = #rates { avg_egress = AvgEgressRate,
@@ -755,7 +767,6 @@ status(#vqstate {
{target_ram_count , TargetRamCount},
{ram_msg_count , RamMsgCount},
{ram_ack_count , gb_trees:size(RAI)},
- {ram_index_count , RamIndexCount},
{next_seq_id , NextSeqId},
{persistent_count , PersistentCount},
{avg_ingress_rate , AvgIngressRate},
@@ -774,10 +785,9 @@ discard(_Msg, _ChPid, State) -> State.
%%----------------------------------------------------------------------------
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len,
- persistent_count = PersistentCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }) ->
+ len = Len,
+ persistent_count = PersistentCount,
+ ram_msg_count = RamMsgCount }) ->
E1 = ?QUEUE:is_empty(Q1),
E2 = ?QUEUE:is_empty(Q2),
ED = Delta#delta.count == 0,
@@ -793,7 +803,6 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = Len >= 0,
true = PersistentCount >= 0,
true = RamMsgCount >= 0,
- true = RamIndexCount >= 0,
State.
@@ -910,44 +919,25 @@ betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
{Filtered, rabbit_queue_index:ack(
Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
-%% the first arg is the older delta
-combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
- ?BLANK_DELTA;
-combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = B) ->
- true = Start + Count =< End, %% ASSERTION
- B;
-combine_deltas(#delta { start_seq_id = Start,
- count = Count,
- end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
- true = Start + Count =< End, %% ASSERTION
- A;
-combine_deltas(#delta { start_seq_id = StartLow,
- count = CountLow,
- end_seq_id = EndLow },
- #delta { start_seq_id = StartHigh,
- count = CountHigh,
- end_seq_id = EndHigh }) ->
- Count = CountLow + CountHigh,
- true = (StartLow =< StartHigh) %% ASSERTIONS
- andalso ((StartLow + CountLow) =< EndLow)
- andalso ((StartHigh + CountHigh) =< EndHigh)
- andalso ((StartLow + Count) =< EndHigh),
- #delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
-
-expand_delta(SeqId, Delta) ->
- DeltaInc = #delta { start_seq_id = SeqId,
- count = 1,
- end_seq_id = SeqId + 1 },
- case Delta of
- ?BLANK_DELTA ->
- DeltaInc;
- #delta { start_seq_id = StartSeqId } when SeqId < StartSeqId ->
- combine_deltas(DeltaInc, Delta);
- #delta { end_seq_id = EndSeqId } when SeqId >= EndSeqId ->
- combine_deltas(Delta, DeltaInc)
- end.
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
+ #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId < StartSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { start_seq_id = SeqId, count = Count + 1 };
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId >= EndSeqId ->
+ true = StartSeqId + Count =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 };
+expand_delta(_SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId } = Delta) ->
+ true = StartSeqId + Count + 1 =< EndSeqId, %% ASSERTION
+ Delta #delta { count = Count + 1 }.
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -992,7 +982,6 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
ram_msg_count = 0,
ram_msg_count_prev = 0,
ram_ack_count_prev = 0,
- ram_index_count = 0,
out_counter = 0,
in_counter = 0,
rates = blank_rate(Now, DeltaCount1),
@@ -1012,12 +1001,10 @@ blank_rate(Timestamp, IngressLength) ->
avg_ingress = 0.0,
timestamp = Timestamp }.
-in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
- State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
- true -> State #vqstate {
- q3 = ?QUEUE:in_r(MsgStatus, Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }
@@ -1335,14 +1322,12 @@ publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
publish_alpha(MsgStatus, #vqstate {ram_msg_count = RamMsgCount } = State) ->
{MsgStatus, State #vqstate { ram_msg_count = RamMsgCount + 1 }}.
-publish_beta(#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State) ->
- {#msg_status { index_on_disk = IndexOnDisk, msg = Msg} = MsgStatus1,
- #vqstate { ram_index_count = RamIndexCount,
- ram_msg_count = RamMsgCount } = State1} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+publish_beta(MsgStatus, State) ->
+ {#msg_status { msg = Msg} = MsgStatus1,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
{MsgStatus1, State1 #vqstate {
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk) }}.
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, MsgPropsFun, State) ->
@@ -1375,14 +1360,10 @@ delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, MsgPropsFun, State) ->
lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk} = MsgStatus,
- State1} =
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
msg_from_pending_ack(SeqId, MsgPropsFun, State0),
{_MsgStatus, State2} =
- maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State1),
+ maybe_write_to_disk(true, true, MsgStatus, State1),
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0], State2}
end, {Delta, MsgIds, State}, SeqIds).
@@ -1426,10 +1407,10 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% one segment's worth of messages in q3 - and thus would risk
%% perpetually reporting the need for a conversion when no such
%% conversion is needed. That in turn could cause an infinite loop.
-reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun,
+reduce_memory_use(_AlphaBetaFun, _BetaDeltaFun, _AckFun, _GammaDeltaFun,
State = #vqstate {target_ram_count = infinity}) ->
{false, State};
-reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
+reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun, GammaDeltaFun,
State = #vqstate {
ram_ack_index = RamAckIndex,
ram_msg_count = RamMsgCount,
@@ -1440,7 +1421,7 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
avg_egress = AvgAckEgress }
}) ->
- {Reduce, State1} =
+ {Reduce, State1 = #vqstate { q2 = Q2, q3 = Q3 }} =
case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex),
TargetRamCount) of
0 -> {false, State};
@@ -1461,15 +1442,14 @@ reduce_memory_use(AlphaBetaFun, BetaDeltaFun, AckFun,
{true, State2}
end,
- %% AlphaBetaFun may have produced gammas that are bordering
- %% delta. We must ensure that we push these into delta, which is
- %% largely a no-op. This is why we call BetaDeltaFun even with a
- %% quota of 0.
- case chunk_size(State1 #vqstate.ram_index_count,
- permitted_beta_count(State1)) of
- ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
- _ -> {Reduce, BetaDeltaFun(0, State1)}
- end.
+ {Reduce1, State3} =
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ ?IO_BATCH_SIZE = S2 -> {true, BetaDeltaFun(S2, State1)};
+ _ -> {Reduce, State1}
+ end,
+ {Reduce2, State4} = GammaDeltaFun(State3),
+ {Reduce1 orelse Reduce2, State4}.
limit_ram_acks(0, State) ->
{0, State};
@@ -1494,23 +1474,20 @@ reduce_memory_use(State) ->
{_, State1} = reduce_memory_use(fun push_alphas_to_betas/2,
fun push_betas_to_deltas/2,
fun limit_ram_acks/2,
+ fun push_gammas_to_deltas/1,
State),
State1.
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
+permitted_beta_count(#vqstate { target_ram_count = 0 }) ->
+ rabbit_queue_index:next_segment_boundary(0);
permitted_beta_count(#vqstate { len = Len,
q1 = Q1,
- q3 = Q3,
q4 = Q4 }) ->
BetaDeltaLen = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
- Permitted = BetaDeltaLen - trunc(BetaDeltaLen * BetaDeltaLen / Len),
- case ?QUEUE:out(Q3) of
- {empty, _Q3} -> Permitted;
- {{value, #msg_status { seq_id = MinSeqId }}, _Q3} ->
- lists:max([Permitted, rabbit_queue_index:next_segment_boundary(
- MinSeqId) - MinSeqId])
- end.
+ lists:max([BetaDeltaLen - ((BetaDeltaLen * BetaDeltaLen) div Len),
+ rabbit_queue_index:next_segment_boundary(0)]).
chunk_size(Current, Permitted)
when Permitted =:= infinity orelse Permitted >= Current ->
@@ -1518,41 +1495,35 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
lists:min([Current - Permitted, ?IO_BATCH_SIZE]).
-fetch_from_q3(State = #vqstate {
- q1 = Q1,
- q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3,
- q4 = Q4,
- ram_index_count = RamIndexCount}) ->
+fetch_from_q3(State = #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
case ?QUEUE:out(Q3) of
{empty, _Q3} ->
{empty, State};
- {{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk }}, Q3a} ->
- RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk),
- true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a,
- ram_index_count = RamIndexCount1 },
- State2 =
- case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
- {true, true} ->
- %% q3 is now empty, it wasn't before; delta is
- %% still empty. So q2 must be empty, and we
- %% know q4 is empty otherwise we wouldn't be
- %% loading from q3. As such, we can just set
- %% q4 to Q1.
- true = ?QUEUE:is_empty(Q2), %% ASSERTION
- true = ?QUEUE:is_empty(Q4), %% ASSERTION
- State1 #vqstate { q1 = ?QUEUE:new(),
- q4 = Q1 };
- {true, false} ->
- maybe_deltas_to_betas(State1);
- {false, _} ->
- %% q3 still isn't empty, we've not touched
- %% delta, so the invariants between q1, q2,
- %% delta and q3 are maintained
- State1
- end,
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
{loaded, {MsgStatus, State2}}
end.
@@ -1639,42 +1610,35 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
{empty, _Q} ->
{Quota, State};
{{value, MsgStatus}, Qa} ->
- {MsgStatus1 = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }} =
+ {MsgStatus1 = #msg_status { msg_on_disk = true },
+ State1 = #vqstate { ram_msg_count = RamMsgCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
MsgStatus2 = m(trim_msg_status(MsgStatus1)),
- RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
- ram_index_count = RamIndexCount1 },
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1 },
maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))
end.
-push_betas_to_deltas(Quota,
- State = #vqstate { q2 = Q2,
- delta = Delta,
- q3 = Q3,
- index_state = IndexState,
- ram_index_count = RamIndexCount }) ->
- PushState = {Quota, Delta, RamIndexCount, IndexState},
- {Q2a, PushState1} = push_betas_to_deltas(
+push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState }) ->
+ PushState = {Quota, Delta, IndexState},
+ {Q2a, PushState1} = push_with_limit(
fun ?QUEUE:out/1,
fun (Q2MinSeqId) -> Q2MinSeqId end,
- Q2, PushState),
- {Q3a, PushState2} = push_betas_to_deltas(
+ Q2, fun push_betas_to_deltas1/4, PushState),
+ {Q3a, PushState2} = push_with_limit(
fun ?QUEUE:out_r/1,
fun rabbit_queue_index:next_segment_boundary/1,
- Q3, PushState1),
- {_, Delta1, RamIndexCount1, IndexState1} = PushState2,
- State #vqstate { q2 = Q2a,
- delta = Delta1,
- q3 = Q3a,
- index_state = IndexState1,
- ram_index_count = RamIndexCount1 }.
-
-push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ Q3, fun push_betas_to_deltas1/4, PushState1),
+ {_, Delta1, IndexState1} = PushState2,
+ State #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a,
+ index_state = IndexState1 }.
+
+push_with_limit(Generator, LimitFun, Q, PushFun, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->
{Q, PushState};
@@ -1684,16 +1648,15 @@ push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {Q, PushState};
- false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ false -> PushFun(Generator, Limit, Q, PushState)
end
end.
+push_betas_to_deltas1(_Generator, _Limit, Q,
+ {0, _Delta, _IndexState} = PushState) ->
+ {Q, PushState};
push_betas_to_deltas1(Generator, Limit, Q,
- {0, Delta, RamIndexCount, IndexState}) ->
- {Qb, Delta1} = push_gammas_to_deltas(Generator, Limit, Q, Delta),
- {Qb, {0, Delta1, RamIndexCount, IndexState}};
-push_betas_to_deltas1(Generator, Limit, Q,
- {Quota, Delta, RamIndexCount, IndexState} = PushState) ->
+ {Quota, Delta, IndexState} = PushState) ->
case Generator(Q) of
{empty, _Q} ->
{Q, PushState};
@@ -1702,20 +1665,33 @@ push_betas_to_deltas1(Generator, Limit, Q,
{Q, PushState};
{{value, MsgStatus = #msg_status { index_on_disk = IndexOnDisk,
seq_id = SeqId }}, Qa} ->
- {Quota1, RamIndexCount1, IndexState1} =
+ {Quota1, IndexState1} =
case IndexOnDisk of
- true -> {Quota, RamIndexCount, IndexState};
+ true -> {Quota, IndexState};
false -> {#msg_status { index_on_disk = true },
IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,
IndexState),
- {Quota - 1, RamIndexCount - 1, IndexState2}
+ {Quota - 1, IndexState2}
end,
Delta1 = expand_delta(SeqId, Delta),
push_betas_to_deltas1(Generator, Limit, Qa,
- {Quota1, Delta1, RamIndexCount1, IndexState1})
+ {Quota1, Delta1, IndexState1})
end.
+push_gammas_to_deltas(State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3 }) ->
+ {Q2a, Delta1} = push_with_limit(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, fun push_gammas_to_deltas/4, Delta),
+ {Q3a, Delta2} = push_with_limit(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, fun push_gammas_to_deltas/4, Delta1),
+ {Delta2 =/= Delta, State #vqstate { q2 = Q2a, delta = Delta2, q3 = Q3a }}.
+
push_gammas_to_deltas(Generator, Limit, Q, Delta) ->
case Generator(Q) of
{{value, #msg_status { seq_id = SeqId, index_on_disk = true }}, Q1}