diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-05-28 13:54:58 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-05-28 13:54:58 +0100 |
| commit | cdb3ab6c61bf7679d303bd873eb3bbf220bd10d9 (patch) | |
| tree | e26bc064a5bd96bd6208de64032af1e00462bc30 | |
| parent | 41c800936c023b0ba0be1410556f1d5f3e750b78 (diff) | |
| download | rabbitmq-server-git-cdb3ab6c61bf7679d303bd873eb3bbf220bd10d9.tar.gz | |
introduce helper fun to fold over segment entries
and use that in 'read' and 'queue_index_walker_reader'. That makes the
code in the latter less of a jumping-through-hoops exercise.
| -rw-r--r-- | src/rabbit_queue_index.erl | 53 |
1 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 487430fc71..c34523d17f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -297,21 +297,17 @@ read(Start, End, State = #qistate { segments = Segments, true -> EndRelSeq; false -> ?SEGMENT_ENTRY_COUNT end, - Segment = #segment { journal_entries = JEntries } = - segment_find_or_new(StartSeg, Dir, Segments), - {SegEntries, _UnackedCount} = load_segment(false, Segment), - {SegEntries1, _UnackedCountDelta} = - segment_plus_journal(SegEntries, JEntries), - {array:sparse_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) - when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), - IsPersistent, IsDelivered == del} | Acc ]; - (_RelSeq, _Value, Acc) -> - Acc - end, [], SegEntries1), - Again, - State #qistate { segments = segment_store(Segment, Segments) }}. + Segment = segment_find_or_new(StartSeg, Dir, Segments), + Messages = segment_entries_foldr( + fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq -> + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ]; + (_RelSeq, _Value, Acc) -> + Acc + end, [], Segment), + Segments1 = segment_store(Segment, Segments), + {Messages, Again, State #qistate { segments = Segments1 }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -513,17 +509,16 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> end. queue_index_walker_reader(QueueName, Gatherer) -> - State = recover_journal(blank_state(QueueName)), - State1 = lists:foldl( - fun (Seg, State2) -> - SeqId = reconstruct_seq_id(Seg, 0), - {Messages, undefined, State3} = - read(SeqId, next_segment_boundary(SeqId), State2), - [ok = gatherer:in(Gatherer, {Guid, 1}) || - {Guid, _SeqId, true, _IsDelivered} <- Messages], - State3 - end, State, all_segment_nums(State)), - {_SegmentCounts, _State} = terminate(State1), + State = #qistate { segments = Segments, dir = Dir } = + recover_journal(blank_state(QueueName)), + [ok = segment_entries_foldr( + fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) -> + gatherer:in(Gatherer, {Guid, 1}); + (_RelSeq, _Value, Acc) -> + Acc + end, ok, segment_find_or_new(Seg, Dir, Segments)) || + Seg <- all_segment_nums(State)], + {_SegmentCounts, _State} = terminate(State), ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- @@ -768,6 +763,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> end, Hdl. +segment_entries_foldr(Fun, Init, + Segment = #segment { journal_entries = JEntries }) -> + {SegEntries, _UnackedCount} = load_segment(false, Segment), + {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries), + array:sparse_foldr(Fun, Init, SegEntries1). + %% Loading segments %% %% Does not do any combining with the journal at all. The PubCount |
