summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-26 21:36:59 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-26 21:36:59 +0100
commitee3feaaf4c6cde126198c37cbfcc35cc7180e9f2 (patch)
tree7a8323ef0c74509522a565be45f3d13338d81aaa
parentf7620ca0993a5fa471b1884fd797ab191c667eb6 (diff)
downloadrabbitmq-server-git-ee3feaaf4c6cde126198c37cbfcc35cc7180e9f2.tar.gz
Turn q3 back into queue
And filter messages from queue index
-rw-r--r--src/q3tree.erl107
-rw-r--r--src/rabbit_variable_queue.erl148
2 files changed, 85 insertions, 170 deletions
diff --git a/src/q3tree.erl b/src/q3tree.erl
deleted file mode 100644
index a5fcd74014..0000000000
--- a/src/q3tree.erl
+++ /dev/null
@@ -1,107 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
-%%
-
--module(q3tree).
-
-%% A less general random access variation of bpqueue for message status records
-
--export([new/0, is_empty/1, len/1, in/3, in_r/3, out/1, out_r/1, least_key/1,
- join/2, join_bpqueue/2, foldr/3, from_batch/1, map_fold_filter_r/4]).
-
--include("rabbit.hrl").
--include("rabbit_backing_queue.hrl").
-
-new() ->
- gb_trees:empty().
-
-is_empty(T) ->
- gb_trees:is_empty(T).
-
-len(T) ->
- gb_trees:size(T).
-
-in(IndexOnDisk, MsgStatus, Tree) -> in_r(IndexOnDisk, MsgStatus, Tree).
-
-in_r(IndexOnDisk,
- #msg_status { seq_id = SeqId, index_on_disk = IndexOnDisk } = MsgStatus,
- Tree) ->
- gb_trees:insert(SeqId, MsgStatus, Tree);
-in_r(IndexOnDisk, _Msgstatus, _Tree) ->
- throw({prefix_and_msg_disagree, IndexOnDisk}).
-
-out(Tree) -> out1(Tree, fun gb_trees:take_smallest/1).
-out_r(Tree) -> out1(Tree, fun gb_trees:take_largest/1).
-
-out1(Tree, TakeFun) ->
- case gb_trees:is_empty(Tree) of
- true -> {empty, Tree};
- false -> {_Key, #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
- Tree2} = TakeFun(Tree),
- {{value, IndexOnDisk, MsgStatus}, Tree2}
- end.
-
-least_key(Tree) ->
- {Least, _} = gb_trees:smallest(Tree),
- Least.
-
-join(T1, T2) ->
- join1(gb_trees:iterator(T1), T2).
-join1(Iter, T) ->
- case gb_trees:next(Iter) of
- none -> T;
- {_SeqId,
- #msg_status { index_on_disk = IndexOnDisk } = MsgStatus,
- Iter1} -> join1(Iter1, in_r(IndexOnDisk, MsgStatus, T))
- end.
-
-join_bpqueue(T, Q) ->
- bpqueue:foldr(fun (IndexOnDisk, MsgStatus, Tree) ->
- in_r(IndexOnDisk, MsgStatus, Tree)
- end, T, Q).
-
-foldr(Fun, Acc, Tree) ->
- lists:foldr(Fun, Acc, gb_trees:to_list(Tree)).
-
-from_batch({IndexOnDisk, L}) ->
- lists:foldl(fun (MsgStatus, Tree) ->
- in_r(IndexOnDisk, MsgStatus, Tree)
- end, new(), L).
-
-map_fold_filter_r(PFilter, Fun, Acc, Tree) ->
- map_fold_filter_r1(PFilter, Fun, Acc, Tree, new()).
-
-map_fold_filter_r1(PFilter, Fun, Acc, TreeOld, TreeNew) ->
- case out_r(TreeOld) of
- {empty, _T} -> {TreeNew, Acc};
- {{value,
- IndexOnDisk, #msg_status{index_on_disk = IndexOnDisk} = MsgStatus},
- TreeOld1} ->
- case PFilter(IndexOnDisk) of
- false ->
- map_fold_filter_r1(PFilter, Fun, Acc, TreeOld1,
- in_r(IndexOnDisk, MsgStatus, TreeNew));
- true ->
- case Fun(MsgStatus, Acc) of
- stop ->
- {join(TreeOld, TreeNew), Acc};
- {IndexOnDisk1, MsgStatus1, Acc1} ->
- map_fold_filter_r1(PFilter, Fun, Acc1, TreeOld1,
- in_r(IndexOnDisk1, MsgStatus1,
- TreeNew))
- end
- end
- end.
-
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 56280e1523..ff0509a7b4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -307,7 +307,7 @@
q1 :: queue(),
q2 :: bpqueue:bpqueue(),
delta :: delta(),
- q3 :: gb_tree(),
+ q3 :: bpqueue:bpqueue(),
q4 :: queue(),
next_seq_id :: seq_id(),
pending_ack :: dict(),
@@ -684,7 +684,7 @@ status(#vqstate {
[ {q1 , queue:len(Q1)},
{q2 , bpqueue:len(Q2)},
{delta , Delta},
- {q3 , q3tree:len(Q3)},
+ {q3 , bpqueue:len(Q3)},
{q4 , queue:len(Q4)},
{len , Len},
{pending_acks , dict:size(PA)},
@@ -717,7 +717,7 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
E1 = queue:is_empty(Q1),
E2 = bpqueue:is_empty(Q2),
ED = Delta#delta.count == 0,
- E3 = q3tree:is_empty(Q3),
+ E3 = bpqueue:is_empty(Q3),
E4 = queue:is_empty(Q4),
LZ = Len == 0,
@@ -813,29 +813,33 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, AckedFun, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
- {Filtered1, Delivers1, Acks1}) ->
+ {Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> {[m(#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }) | Filtered1],
- Delivers1,
- Acks1}
+ false -> case AckedFun(SeqId) of
+ false -> {[m(#msg_status {
+ msg = undefined,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }) | Filtered1],
+ Delivers1,
+ Acks1};
+ true -> Acc
+ end
end
end, {[], [], []}, List),
- {q3tree:from_batch({true, Filtered}),
+ {bpqueue:from_list([{true, Filtered}]),
rabbit_queue_index:ack(Acks,
rabbit_queue_index:deliver(Delivers, IndexState))}.
@@ -866,7 +870,7 @@ combine_deltas(#delta { start_seq_id = StartLow,
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
beta_fold(Fun, Init, Q) ->
- q3tree:foldr(fun ({_SeqID, Value}, Acc) -> Fun(Value, Acc) end, Init, Q).
+ bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
update_rate(Now, Then, Count, {OThen, OCount}) ->
%% avg over the current period and the previous
@@ -892,7 +896,7 @@ init(IsDurable, IndexState, DeltaCount, Terms, AsyncCallback,
q1 = queue:new(),
q2 = bpqueue:new(),
delta = Delta,
- q3 = q3tree:new(),
+ q3 = bpqueue:new(),
q4 = queue:new(),
next_seq_id = NextSeqId,
pending_ack = dict:new(),
@@ -935,7 +939,7 @@ in_r(MsgStatus = #msg_status { msg = undefined, index_on_disk = IndexOnDisk },
State = #vqstate { q3 = Q3, q4 = Q4, ram_index_count = RamIndexCount }) ->
case queue:is_empty(Q4) of
true -> State #vqstate {
- q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
+ q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk) };
false -> {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
read_msg(MsgStatus, State),
@@ -993,11 +997,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent, IsDelivered} of
- {false, true, false, _, _} -> Rem(), IndexState1;
- {false, true, true, _, _} -> Rem(), Ack();
- { true, true, true, false, false} -> Ack();
- _ -> IndexState1
+ case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1024,7 +1027,7 @@ purge_betas_and_deltas(LensByStore,
State = #vqstate { q3 = Q3,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- case q3tree:is_empty(Q3) of
+ case bpqueue:is_empty(Q3) of
true -> {LensByStore, State};
false -> {LensByStore1, IndexState1} =
remove_queue_entries(fun beta_fold/3, Q3,
@@ -1032,7 +1035,7 @@ purge_betas_and_deltas(LensByStore,
purge_betas_and_deltas(LensByStore1,
maybe_deltas_to_betas(
State #vqstate {
- q3 = q3tree:new(),
+ q3 = bpqueue:new(),
index_state = IndexState1 }))
end.
@@ -1083,7 +1086,7 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk},
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = case q3tree:is_empty(Q3) of
+ State2 = case bpqueue:is_empty(Q3) of
false -> State1 #vqstate { q1 = queue:in(m(MsgStatus1), Q1) };
true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) }
end,
@@ -1166,7 +1169,7 @@ remove_pending_ack(KeepPersistent,
State = #vqstate { pending_ack = PA,
index_state = IndexState,
msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
+ {MsgIdsByStore, _AllMsgIds} =
dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
@@ -1178,7 +1181,7 @@ remove_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ rabbit_queue_index:ack(dict:fetch_keys(PA), IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
@@ -1187,7 +1190,7 @@ remove_pending_ack(KeepPersistent,
ack(_MsgStoreFun, _Fun, [], State) ->
{[], State};
ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
+ {{MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
msg_store_clients = MSCState,
persistent_count = PCount,
@@ -1202,7 +1205,7 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
ram_ack_index =
gb_trees:delete_any(SeqId, RAI)})}
end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ IndexState1 = rabbit_queue_index:ack(AckTags, IndexState),
[ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
@@ -1212,18 +1215,17 @@ ack(MsgStoreFun, Fun, AckTags, State) ->
persistent_count = PCount1,
ack_out_counter = AckOutCount + length(AckTags) }}.
-accumulate_ack_init() -> {[], orddict:new(), []}.
+accumulate_ack_init() -> {orddict:new(), []}.
accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
msg_on_disk = false,
index_on_disk = false,
msg_id = MsgId },
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+ {MsgIdsByStore, AllMsgIds}) ->
+ {MsgIdsByStore, [MsgId | AllMsgIds]};
+accumulate_ack(_SeqId, {IsPersistent, MsgId, _MsgProps, _IndexOnDisk},
+ {MsgIdsByStore, AllMsgIds}) ->
+ {rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
@@ -1351,9 +1353,9 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId,
#vqstate { q3 = Q3 } = State1} =
maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
State1 #vqstate {
- q3 = q3tree:in_r(IndexOnDisk1,
- MsgStatus1,
- Q3),
+ q3 = q3_merge(IndexOnDisk1,
+ MsgStatus1,
+ Q3),
ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) };
@@ -1374,8 +1376,7 @@ publish_r(MsgStatus = #msg_status { seq_id = SeqId,
pick_store(SeqId, #vqstate { q3 = Q3,
delta = #delta { start_seq_id = DeltaLimit }
= Delta}) ->
- case q3tree:is_empty(Q3) orelse
- not q3tree:is_empty(Q3) andalso SeqId < q3tree:least_key(Q3) of
+ case bpqueue:is_empty(Q3) orelse SeqId < q3_least_key(Q3) of
true -> q4;
false -> BlankDelta = case Delta of
?BLANK_DELTA_PATTERN(X) -> true;
@@ -1387,6 +1388,24 @@ pick_store(SeqId, #vqstate { q3 = Q3,
end
end.
+q3_least_key(BPQ) ->
+ {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = bpqueue:out(BPQ),
+ SeqId.
+
+q3_merge(IndexOnDisk, MsgStatus, Q) ->
+ q3_merge(IndexOnDisk, MsgStatus, Q, bpqueue:new()).
+
+q3_merge(IndexOnDisk, #msg_status {seq_id = SeqId } = MsgStatus, Q, Front) ->
+ case bpqueue:out(Q) of
+ {{value, IndexOnDiskHead, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
+ case SeqId1 > SeqId of
+ true -> bpqueue:join(bpqueue:in(IndexOnDisk, MsgStatus, Front), Q);
+ false -> q3_merge(IndexOnDisk, MsgStatus, Q1, bpqueue:in(IndexOnDiskHead, MsgStatusHead, Front))
+ end;
+ {empty, _Q1} ->
+ bpqueue:in(IndexOnDisk, MsgStatus, Front)
+ end.
+
q4_merge(MsgStatus, Q) ->
q4_merge(MsgStatus, Q, queue:new()).
@@ -1508,7 +1527,7 @@ limit_ram_index(Quota, State = #vqstate { q2 = Q2, q3 = Q3,
%% can never end up in delta due them residing in the only segment
%% held by q3.
{Q3a, {Quota2, IndexState2}} = limit_ram_index(
- fun q3tree:map_fold_filter_r/4,
+ fun bpqueue:map_fold_filter_r/4,
Q3, {Quota1, IndexState1}),
State #vqstate { q2 = Q2a, q3 = Q3a,
index_state = IndexState2,
@@ -1535,7 +1554,7 @@ permitted_ram_index_count(#vqstate { len = Len,
q2 = Q2,
q3 = Q3,
delta = #delta { count = DeltaCount } }) ->
- BetaLen = bpqueue:len(Q2) + q3tree:len(Q3),
+ BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
BetaLen - trunc(BetaLen * BetaLen / (Len - DeltaCount)).
chunk_size(Current, Permitted)
@@ -1551,7 +1570,7 @@ fetch_from_q3(State = #vqstate {
q3 = Q3,
q4 = Q4,
ram_index_count = RamIndexCount}) ->
- case q3tree:out(Q3) of
+ case bpqueue:out(Q3) of
{empty, _Q3} ->
{empty, State};
{{value, IndexOnDisk, MsgStatus}, Q3a} ->
@@ -1560,7 +1579,7 @@ fetch_from_q3(State = #vqstate {
State1 = State #vqstate { q3 = Q3a,
ram_index_count = RamIndexCount1 },
State2 =
- case {q3tree:is_empty(Q3a), 0 == DeltaCount} of
+ case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
{true, true} ->
%% q3 is now empty, it wasn't before; delta is
%% still empty. So q2 must be empty, and we
@@ -1589,6 +1608,7 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ pending_ack = PA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1599,9 +1619,11 @@ maybe_deltas_to_betas(State = #vqstate {
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, IndexState1),
+ betas_from_index_entries(List, TransientThreshold,
+ fun (SeqId) -> dict:is_key(SeqId, PA) end,
+ IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
- case q3tree:len(Q3a) of
+ case bpqueue:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
@@ -1609,14 +1631,14 @@ maybe_deltas_to_betas(State = #vqstate {
State1 #vqstate {
delta = Delta #delta { start_seq_id = DeltaSeqId1 }});
Q3aLen ->
- Q3b = q3tree:join(Q3, Q3a),
+ Q3b = bpqueue:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
State1 #vqstate { q2 = bpqueue:new(),
delta = ?BLANK_DELTA,
- q3 = q3tree:join_bpqueue(Q3b, Q2) };
+ q3 = bpqueue:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = #delta { start_seq_id = DeltaSeqId1,
count = N,
@@ -1637,7 +1659,7 @@ maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) ->
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
State1 #vqstate { q1 = Q1a,
- q3 = q3tree:in(IndexOnDisk, MsgStatus, Q3) };
+ q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) };
(MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q1a, State1 = #vqstate { q2 = Q2 }) ->
State1 #vqstate { q1 = Q1a,
@@ -1649,7 +1671,7 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) ->
fun queue:out_r/1,
fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk },
Q4a, State1 = #vqstate { q3 = Q3 }) ->
- State1 #vqstate { q3 = q3tree:in_r(IndexOnDisk, MsgStatus, Q3),
+ State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3),
q4 = Q4a }
end, Quota, Q4, State).
@@ -1686,11 +1708,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
ram_index_count = RamIndexCount }) ->
{Delta2, Q2a, RamIndexCount2, IndexState2} =
push_betas_to_deltas(fun (Q2MinSeqId) -> Q2MinSeqId end,
- fun bpqueue:out/1, bpqueue, Q2,
+ fun bpqueue:out/1, Q2,
RamIndexCount, IndexState),
{Delta3, Q3a, RamIndexCount3, IndexState3} =
push_betas_to_deltas(fun rabbit_queue_index:next_segment_boundary/1,
- fun q3tree:out_r/1, q3tree, Q3,
+ fun bpqueue:out_r/1, Q3,
RamIndexCount2, IndexState2),
Delta4 = combine_deltas(Delta3, combine_deltas(Delta, Delta2)),
State #vqstate { q2 = Q2a,
@@ -1699,19 +1721,19 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2,
index_state = IndexState3,
ram_index_count = RamIndexCount3 }.
-push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) ->
- case QMod:out(Q) of
+push_betas_to_deltas(LimitFun, Generator, Q, RamIndexCount, IndexState) ->
+ case bpqueue:out(Q) of
{empty, _Q} ->
{?BLANK_DELTA, Q, RamIndexCount, IndexState};
{{value, _IndexOnDisk1, #msg_status { seq_id = MinSeqId }}, _Qa} ->
{{value, _IndexOnDisk2, #msg_status { seq_id = MaxSeqId }}, _Qb} =
- QMod:out_r(Q),
+ bpqueue:out_r(Q),
Limit = LimitFun(MinSeqId),
case MaxSeqId < Limit of
true -> {?BLANK_DELTA, Q, RamIndexCount, IndexState};
false -> {Len, Qc, RamIndexCount1, IndexState1} =
- push_betas_to_deltas1(Generator, Limit, Q, 0,
- RamIndexCount, IndexState),
+ push_betas_to_deltas(Generator, Limit, Q, 0,
+ RamIndexCount, IndexState),
{#delta { start_seq_id = Limit,
count = Len,
end_seq_id = MaxSeqId + 1 },
@@ -1719,7 +1741,7 @@ push_betas_to_deltas(LimitFun, Generator, QMod, Q, RamIndexCount, IndexState) ->
end
end.
-push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
+push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
case Generator(Q) of
{empty, _Q} ->
{Count, Q, RamIndexCount, IndexState};
@@ -1736,7 +1758,7 @@ push_betas_to_deltas1(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
IndexState),
{RamIndexCount - 1, IndexState2}
end,
- push_betas_to_deltas1(
+ push_betas_to_deltas(
Generator, Limit, Qa, Count + 1, RamIndexCount1, IndexState1)
end.