summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-28 13:54:58 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-28 13:54:58 +0100
commitcdb3ab6c61bf7679d303bd873eb3bbf220bd10d9 (patch)
treee26bc064a5bd96bd6208de64032af1e00462bc30
parent41c800936c023b0ba0be1410556f1d5f3e750b78 (diff)
downloadrabbitmq-server-git-cdb3ab6c61bf7679d303bd873eb3bbf220bd10d9.tar.gz
introduce helper fun to fold over segment entries
and use that in 'read' and 'queue_index_walker_reader'. That makes the code in the latter less of a jumping-through-hoops exercise.
-rw-r--r--src/rabbit_queue_index.erl53
1 files changed, 27 insertions, 26 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 487430fc71..c34523d17f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -297,21 +297,17 @@ read(Start, End, State = #qistate { segments = Segments,
true -> EndRelSeq;
false -> ?SEGMENT_ENTRY_COUNT
end,
- Segment = #segment { journal_entries = JEntries } =
- segment_find_or_new(StartSeg, Dir, Segments),
- {SegEntries, _UnackedCount} = load_segment(false, Segment),
- {SegEntries1, _UnackedCountDelta} =
- segment_plus_journal(SegEntries, JEntries),
- {array:sparse_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
- when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
- IsPersistent, IsDelivered == del} | Acc ];
- (_RelSeq, _Value, Acc) ->
- Acc
- end, [], SegEntries1),
- Again,
- State #qistate { segments = segment_store(Segment, Segments) }}.
+ Segment = segment_find_or_new(StartSeg, Dir, Segments),
+ Messages = segment_entries_foldr(
+ fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq ->
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
+ IsPersistent, IsDelivered == del} | Acc ];
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, [], Segment),
+ Segments1 = segment_store(Segment, Segments),
+ {Messages, Again, State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -513,17 +509,16 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.
queue_index_walker_reader(QueueName, Gatherer) ->
- State = recover_journal(blank_state(QueueName)),
- State1 = lists:foldl(
- fun (Seg, State2) ->
- SeqId = reconstruct_seq_id(Seg, 0),
- {Messages, undefined, State3} =
- read(SeqId, next_segment_boundary(SeqId), State2),
- [ok = gatherer:in(Gatherer, {Guid, 1}) ||
- {Guid, _SeqId, true, _IsDelivered} <- Messages],
- State3
- end, State, all_segment_nums(State)),
- {_SegmentCounts, _State} = terminate(State1),
+ State = #qistate { segments = Segments, dir = Dir } =
+ recover_journal(blank_state(QueueName)),
+ [ok = segment_entries_foldr(
+ fun (_RelSeq, {{Guid, true}, _IsDelivered, no_ack}, ok) ->
+ gatherer:in(Gatherer, {Guid, 1});
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, ok, segment_find_or_new(Seg, Dir, Segments)) ||
+ Seg <- all_segment_nums(State)],
+ {_SegmentCounts, _State} = terminate(State),
ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
@@ -768,6 +763,12 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
end,
Hdl.
+segment_entries_foldr(Fun, Init,
+ Segment = #segment { journal_entries = JEntries }) ->
+ {SegEntries, _UnackedCount} = load_segment(false, Segment),
+ {SegEntries1, _UnackedCountD} = segment_plus_journal(SegEntries, JEntries),
+ array:sparse_foldr(Fun, Init, SegEntries1).
+
%% Loading segments
%%
%% Does not do any combining with the journal at all. The PubCount