summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-13 10:49:08 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-13 10:49:08 +0000
commitd12b4d8e08005f97c5557f0948b3d03c15ef8c1b (patch)
treec903981acd4d55b05f57987d3777a294f35321a4 /src
parentd2e4fff191483fd69a5fc3eee03ba0a10a15008e (diff)
downloadrabbitmq-server-git-d12b4d8e08005f97c5557f0948b3d03c15ef8c1b.tar.gz
include pending_acks in 'fold'
we implement this as a zipper over three iterators
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl59
1 files changed, 46 insertions, 13 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d3147999a5..185da19c96 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -676,7 +676,19 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State).
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} =
+ lists:foldl(fun (It, {Its, IndexState2}) ->
+ case next(It, IndexState2) of
+ {empty, IndexState3} ->
+ {Its, IndexState3};
+ {value, MsgStatus, It1, IndexState3} ->
+ {[{MsgStatus, It1} | Its], IndexState3}
+ end
+ end, {[], IndexState}, [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
len(#vqstate { len = Len }) -> Len.
@@ -1447,7 +1459,13 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%% Iterator
%%----------------------------------------------------------------------------
-iterator(State) -> istate(start, State).
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
istate(start, State) -> {q4, State#vqstate.q4, State};
istate(q4, State) -> {q3, State#vqstate.q3, State};
@@ -1456,6 +1474,11 @@ istate(delta, State) -> {q2, State#vqstate.q2, State};
istate(q2, State) -> {q1, State#vqstate.q1, State};
istate(q1, _State) -> done.
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState}
+ end;
next(done, IndexState) -> {empty, IndexState};
next({delta, #delta{start_seq_id = SeqId,
end_seq_id = SeqId}, State}, IndexState) ->
@@ -1477,17 +1500,27 @@ next({Key, Q, State}, IndexState) ->
{value, MsgStatus, Next, IndexState}
end.
-ifold(Fun, Acc, It, State = #vqstate{index_state = IndexState}) ->
- case next(It, IndexState) of
- {value, MsgStatus, Next, IndexState1} ->
- State1 = State#vqstate{index_state = IndexState1},
- {Msg, State2} = read_msg(MsgStatus, State1),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
- {stop, Acc1} -> {Acc1, State2};
- {cont, Acc1} -> ifold(Fun, Acc1, Next, State2)
- end;
- {empty, IndexState1} ->
- {Acc, State#vqstate{index_state = IndexState1}}
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, It} | Rest] = lists:sort(
+ fun ({MsgStatus1, _}, {MsgStatus2, _}) ->
+ MsgStatus1#msg_status.seq_id <
+ MsgStatus2#msg_status.seq_id
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ case next(It, State1#vqstate.index_state) of
+ {empty, IndexState1} ->
+ ifold(Fun, Acc1, Rest,
+ State1#vqstate{index_state = IndexState1});
+ {value, MsgStatus1, It1, IndexState1} ->
+ ifold(Fun, Acc1, [{MsgStatus1, It1} | Rest],
+ State1#vqstate{index_state = IndexState1})
+ end
end.
%%----------------------------------------------------------------------------