summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2012-11-22 13:13:08 +0000
committerEmile Joubert <emile@rabbitmq.com>2012-11-22 13:13:08 +0000
commit262492edebca6bc504bd08201949842e80b3a24e (patch)
treef85d3eff474dd1f840287e7fbb1edd246baea62e
parentfae659cd9f15af9a9c4d4528072059b12d60af26 (diff)
parenta7bea4a0db4d7097be2257bfff797c4595955cda (diff)
downloadrabbitmq-server-git-262492edebca6bc504bd08201949842e80b3a24e.tar.gz
Merged default
-rw-r--r--src/rabbit_variable_queue.erl47
1 files changed, 47 insertions, 0 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 862e74f628..e9d3c6e312 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -678,6 +678,53 @@ requeue(AckTags, #vqstate { delta = Delta,
in_counter = InCounter + MsgCount,
len = Len + MsgCount }))}.
+fold(Fun, Acc, #vqstate { q1 = Q1,
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ q4 = Q4} = State) ->
+ QFun = fun(M, {A, S}) ->
+ {#msg_status{msg = Msg}, State1} = read_msg(M, S, false),
+ A1 = Fun(Msg, A),
+ {A1, State1}
+ end,
+ {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4),
+ {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3),
+ {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2),
+ {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2),
+ ?QUEUE:foldl(QFun, {Acc4, State4}, Q1).
+
+delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) ->
+ {Acc, State};
+delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd}, State) ->
+ {List, State1 = #vqstate { msg_store_clients = MSCState }} =
+ delta_index(DeltaSeqId, DeltaSeqIdEnd, State),
+ {Result, MSCState3} =
+ lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered},
+ {Acc1, MSCState1}) ->
+ {{ok, Msg = #basic_message {}}, MSCState2} =
+ msg_store_read(MSCState1, IsPersistent, MsgId),
+ {Fun(Msg, Acc1), MSCState2}
+ end, {Acc, MSCState}, List),
+ {Result, State1 #vqstate { msg_store_clients = MSCState3}}.
+
+delta_index(DeltaSeqId, DeltaSeqIdEnd, State) ->
+ delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []).
+
+delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List)
+ when DeltaSeqIdDone == DeltaSeqIdEnd ->
+ {List, State};
+delta_index(DeltaSeqIdDone, DeltaSeqIdEnd,
+ #vqstate { index_state = IndexState } = State, List) ->
+ DeltaSeqId1 = lists:min(
+ [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone),
+ DeltaSeqIdEnd]),
+ {List1, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState),
+ delta_index(DeltaSeqId1, DeltaSeqIdEnd,
+ State #vqstate { index_state = IndexState1 }, List ++ List1).
+
len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).