diff options
| -rw-r--r-- | src/rabbit_tests.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 50 |
2 files changed, 45 insertions, 15 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 7257827ada..3f7aa9e2f0 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2327,8 +2327,9 @@ test_variable_queue() -> passed. test_variable_queue_fold(VQ0) -> - {Count, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), - Msgs = RequeuedMsgs ++ FreshMsgs, + {Count, PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), + Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), lists:foldl( fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). @@ -2397,10 +2398,11 @@ variable_queue_with_holes(VQ0) -> Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + {Depth, Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. test_variable_queue_requeue(VQ0) -> - {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + {_, _PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = + variable_queue_with_holes(VQ0), Msgs = lists:zip(RequeuedMsgs, lists:duplicate(length(RequeuedMsgs), true)) ++ diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c0552577b5..bbec70b2a5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -676,7 +676,12 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State). +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState}, + [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -1447,7 +1452,13 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% Iterator %%---------------------------------------------------------------------------- -iterator(State) -> istate(start, State). +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; istate(q4, State) -> {q3, State#vqstate.q3, State}; @@ -1456,6 +1467,11 @@ istate(delta, State) -> {q2, State#vqstate.q2, State}; istate(q2, State) -> {q1, State#vqstate.q1, State}; istate(q1, _State) -> done. +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState} + end; next(done, IndexState) -> {empty, IndexState}; next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}, State}, IndexState) -> @@ -1482,17 +1498,29 @@ next({Key, Q, State}, IndexState) -> {value, MsgStatus, Next, IndexState} end. -ifold(Fun, Acc, It, State = #vqstate{index_state = IndexState}) -> +inext(It, {Its, IndexState}) -> case next(It, IndexState) of - {value, MsgStatus, Next, IndexState1} -> - State1 = State#vqstate{index_state = IndexState1}, - {Msg, State2} = read_msg(MsgStatus, State1), - case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of - {stop, Acc1} -> {Acc1, State2}; - {cont, Acc1} -> ifold(Fun, Acc1, Next, State2) - end; {empty, IndexState1} -> - {Acc, State#vqstate{index_state = IndexState1}} + {Its, IndexState1}; + {value, MsgStatus1, It1, IndexState1} -> + {[{MsgStatus1, It1} | Its], IndexState1} + end. + +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, It} | Rest] = lists:sort( + fun ({MsgStatus1, _}, {MsgStatus2, _}) -> + MsgStatus1#msg_status.seq_id < + MsgStatus2#msg_status.seq_id + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}), + ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1}) end. %%---------------------------------------------------------------------------- |
