summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl10
-rw-r--r--src/rabbit_variable_queue.erl50
2 files changed, 45 insertions, 15 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7257827ada..3f7aa9e2f0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2327,8 +2327,9 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
- Msgs = RequeuedMsgs ++ FreshMsgs,
+ {Count, PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
+ Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
lists:foldl(
fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
@@ -2397,10 +2398,11 @@ variable_queue_with_holes(VQ0) ->
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+ {Depth, Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
test_variable_queue_requeue(VQ0) ->
- {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ {_, _PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
+ variable_queue_with_holes(VQ0),
Msgs =
lists:zip(RequeuedMsgs,
lists:duplicate(length(RequeuedMsgs), true)) ++
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c0552577b5..bbec70b2a5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -676,7 +676,12 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State).
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -1447,7 +1452,13 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% Iterator
%%----------------------------------------------------------------------------
-iterator(State) -> istate(start, State).
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
istate(q4, State) -> {q3, State#vqstate.q3, State};
@@ -1456,6 +1467,11 @@ istate(delta, State) -> {q2, State#vqstate.q2, State};
istate(q2, State) -> {q1, State#vqstate.q1, State};
istate(q1, _State) -> done.
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState}
+ end;
next(done, IndexState) -> {empty, IndexState};
next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqId}, State}, IndexState) ->
@@ -1482,17 +1498,29 @@ next({Key, Q, State}, IndexState) ->
{value, MsgStatus, Next, IndexState}
end.
-ifold(Fun, Acc, It, State = #vqstate{index_state = IndexState}) ->
+inext(It, {Its, IndexState}) ->
case next(It, IndexState) of
- {value, MsgStatus, Next, IndexState1} ->
- State1 = State#vqstate{index_state = IndexState1},
- {Msg, State2} = read_msg(MsgStatus, State1),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
- {stop, Acc1} -> {Acc1, State2};
- {cont, Acc1} -> ifold(Fun, Acc1, Next, State2)
- end;
{empty, IndexState1} ->
- {Acc, State#vqstate{index_state = IndexState1}}
+ {Its, IndexState1};
+ {value, MsgStatus1, It1, IndexState1} ->
+ {[{MsgStatus1, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, It} | Rest] = lists:sort(
+ fun ({MsgStatus1, _}, {MsgStatus2, _}) ->
+ MsgStatus1#msg_status.seq_id <
+ MsgStatus2#msg_status.seq_id
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
end.
%%----------------------------------------------------------------------------