diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-13 10:49:08 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-13 10:49:08 +0000 |
| commit | d12b4d8e08005f97c5557f0948b3d03c15ef8c1b (patch) | |
| tree | c903981acd4d55b05f57987d3777a294f35321a4 /src | |
| parent | d2e4fff191483fd69a5fc3eee03ba0a10a15008e (diff) | |
| download | rabbitmq-server-git-d12b4d8e08005f97c5557f0948b3d03c15ef8c1b.tar.gz | |
include pending_acks in 'fold'
we implement this as a zipper over three iterators
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 59 |
1 files changed, 46 insertions, 13 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d3147999a5..185da19c96 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -676,7 +676,19 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State), State). +fold(Fun, Acc, State = #vqstate{index_state = IndexState}) -> + {Its, IndexState1} = + lists:foldl(fun (It, {Its, IndexState2}) -> + case next(It, IndexState2) of + {empty, IndexState3} -> + {Its, IndexState3}; + {value, MsgStatus, It1, IndexState3} -> + {[{MsgStatus, It1} | Its], IndexState3} + end + end, {[], IndexState}, [msg_iterator(State), + disk_ack_iterator(State), + ram_ack_iterator(State)]), + ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}). len(#vqstate { len = Len }) -> Len. @@ -1447,7 +1459,13 @@ delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %% Iterator %%---------------------------------------------------------------------------- -iterator(State) -> istate(start, State). +ram_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}. + +disk_ack_iterator(State) -> + {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}. + +msg_iterator(State) -> istate(start, State). istate(start, State) -> {q4, State#vqstate.q4, State}; istate(q4, State) -> {q3, State#vqstate.q3, State}; @@ -1456,6 +1474,11 @@ istate(delta, State) -> {q2, State#vqstate.q2, State}; istate(q2, State) -> {q1, State#vqstate.q1, State}; istate(q1, _State) -> done. +next({ack, It}, IndexState) -> + case gb_trees:next(It) of + none -> {empty, IndexState}; + {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState} + end; next(done, IndexState) -> {empty, IndexState}; next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}, State}, IndexState) -> @@ -1477,17 +1500,27 @@ next({Key, Q, State}, IndexState) -> {value, MsgStatus, Next, IndexState} end. -ifold(Fun, Acc, It, State = #vqstate{index_state = IndexState}) -> - case next(It, IndexState) of - {value, MsgStatus, Next, IndexState1} -> - State1 = State#vqstate{index_state = IndexState1}, - {Msg, State2} = read_msg(MsgStatus, State1), - case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of - {stop, Acc1} -> {Acc1, State2}; - {cont, Acc1} -> ifold(Fun, Acc1, Next, State2) - end; - {empty, IndexState1} -> - {Acc, State#vqstate{index_state = IndexState1}} +ifold(_Fun, Acc, [], State) -> + {Acc, State}; +ifold(Fun, Acc, Its, State) -> + [{MsgStatus, It} | Rest] = lists:sort( + fun ({MsgStatus1, _}, {MsgStatus2, _}) -> + MsgStatus1#msg_status.seq_id < + MsgStatus2#msg_status.seq_id + end, Its), + {Msg, State1} = read_msg(MsgStatus, State), + case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of + {stop, Acc1} -> + {Acc1, State}; + {cont, Acc1} -> + case next(It, State1#vqstate.index_state) of + {empty, IndexState1} -> + ifold(Fun, Acc1, Rest, + State1#vqstate{index_state = IndexState1}); + {value, MsgStatus1, It1, IndexState1} -> + ifold(Fun, Acc1, [{MsgStatus1, It1} | Rest], + State1#vqstate{index_state = IndexState1}) + end end. %%---------------------------------------------------------------------------- |
