diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2012-11-22 13:13:08 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2012-11-22 13:13:08 +0000 |
| commit | 262492edebca6bc504bd08201949842e80b3a24e (patch) | |
| tree | f85d3eff474dd1f840287e7fbb1edd246baea62e | |
| parent | fae659cd9f15af9a9c4d4528072059b12d60af26 (diff) | |
| parent | a7bea4a0db4d7097be2257bfff797c4595955cda (diff) | |
| download | rabbitmq-server-git-262492edebca6bc504bd08201949842e80b3a24e.tar.gz | |
Merged default
| -rw-r--r-- | src/rabbit_variable_queue.erl | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 862e74f628..e9d3c6e312 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -678,6 +678,53 @@ requeue(AckTags, #vqstate { delta = Delta, in_counter = InCounter + MsgCount, len = Len + MsgCount }))}. +fold(Fun, Acc, #vqstate { q1 = Q1, + q2 = Q2, + delta = Delta, + q3 = Q3, + q4 = Q4} = State) -> + QFun = fun(M, {A, S}) -> + {#msg_status{msg = Msg}, State1} = read_msg(M, S, false), + A1 = Fun(Msg, A), + {A1, State1} + end, + {Acc1, State1} = ?QUEUE:foldl(QFun, {Acc, State}, Q4), + {Acc2, State2} = ?QUEUE:foldl(QFun, {Acc1, State1}, Q3), + {Acc3, State3} = delta_fold (Fun, Acc2, Delta, State2), + {Acc4, State4} = ?QUEUE:foldl(QFun, {Acc3, State3}, Q2), + ?QUEUE:foldl(QFun, {Acc4, State4}, Q1). + +delta_fold(_Fun, Acc, ?BLANK_DELTA_PATTERN(X), State) -> + {Acc, State}; +delta_fold(Fun, Acc, #delta { start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd}, State) -> + {List, State1 = #vqstate { msg_store_clients = MSCState }} = + delta_index(DeltaSeqId, DeltaSeqIdEnd, State), + {Result, MSCState3} = + lists:foldl(fun ({MsgId, _SeqId, _MsgProps, IsPersistent, _IsDelivered}, + {Acc1, MSCState1}) -> + {{ok, Msg = #basic_message {}}, MSCState2} = + msg_store_read(MSCState1, IsPersistent, MsgId), + {Fun(Msg, Acc1), MSCState2} + end, {Acc, MSCState}, List), + {Result, State1 #vqstate { msg_store_clients = MSCState3}}. + +delta_index(DeltaSeqId, DeltaSeqIdEnd, State) -> + delta_index(DeltaSeqId, DeltaSeqIdEnd, State, []). + +delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, State, List) + when DeltaSeqIdDone == DeltaSeqIdEnd -> + {List, State}; +delta_index(DeltaSeqIdDone, DeltaSeqIdEnd, + #vqstate { index_state = IndexState } = State, List) -> + DeltaSeqId1 = lists:min( + [rabbit_queue_index:next_segment_boundary(DeltaSeqIdDone), + DeltaSeqIdEnd]), + {List1, IndexState1} = + rabbit_queue_index:read(DeltaSeqIdDone, DeltaSeqId1, IndexState), + delta_index(DeltaSeqId1, DeltaSeqIdEnd, + State #vqstate { index_state = IndexState1 }, List ++ List1). + len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). |
