diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-05-18 13:28:01 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-05-18 13:28:01 +0100 |
| commit | 29b43b7761cd4ddf9861982b367431db65f7b363 (patch) | |
| tree | f505f3c9a84113328981cc3b396e92545004c662 /src | |
| parent | afdeb07ef3abf056976078ea3b68a3cb6a94f266 (diff) | |
| download | rabbitmq-server-git-29b43b7761cd4ddf9861982b367431db65f7b363.tar.gz | |
Apparently it's wrong to do the least amount of work possible.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 55 |
1 files changed, 23 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7cf36193f6..a4bd4cd77b 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -351,14 +351,16 @@ read(Start, End, State = #qistate { segments = Segments, end, Segment = segment_find_or_new(StartSeg, Dir, Segments), {SegEntries, _PubCount, _AckCount, Segment1} = - load_segment(false, StartRelSeq, MaxRelSeq, Segment), + load_segment(false, Segment), #segment { journal_entries = JEntries } = Segment1, {array:sparse_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) -> + fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq -> [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), - IsPersistent, IsDelivered == del} | Acc ] - end, [], - journal_plus_segment(JEntries, SegEntries, StartRelSeq, MaxRelSeq)), + IsPersistent, IsDelivered == del} | Acc ]; + (_RelSeq, _Value, Acc) -> + Acc + end, [], journal_plus_segment(JEntries, SegEntries)), Again, State #qistate { segments = segment_store(Segment1, Segments) }}. @@ -478,7 +480,7 @@ terminate(StoreShutdown, Terms, State = recover_segment(ContainsCheckFun, CleanShutdown, Segment) -> {SegEntries, PubCount, AckCount, Segment1} = - load_segment(false, 0, ?SEGMENT_ENTRY_COUNT, Segment), + load_segment(false, Segment), array:sparse_foldl( fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) -> recover_message(ContainsCheckFun(Guid), CleanShutdown, @@ -652,7 +654,7 @@ load_journal(State) -> %% them if duplicates are in the journal. The counts %% here are purely from the segment itself. {SegEntries, PubCountInSeg, AckCountInSeg, Segment1} = - load_segment(true, 0, ?SEGMENT_ENTRY_COUNT, Segment), + load_segment(true, Segment), %% Removed counts here are the number of pubs and %% acks that are duplicates - i.e. found in both the %% segment and journal. @@ -825,9 +827,8 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> %% number of unacked msgs is PubCount - AckCount. If KeepAcks is %% false, then array:sparse_size(SegEntries) == PubCount - %% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries) -%% == PubCount. StartRelSeq is inclusive, EndRelSeq is exclusive. -load_segment(KeepAcks, StartRelSeq, EndRelSeq, - Segment = #segment { path = Path, handle = SegHdl }) -> +%% == PubCount. +load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) -> SegmentExists = case SegHdl of undefined -> filelib:is_file(Path); _ -> true @@ -837,24 +838,20 @@ load_segment(KeepAcks, StartRelSeq, EndRelSeq, true -> {Hdl, Segment1} = get_segment_handle(Segment), {ok, 0} = file_handle_cache:position(Hdl, bof), {SegEntries, PubCount, AckCount} = - load_segment_entries(KeepAcks, StartRelSeq, EndRelSeq, Hdl, - array_new(), 0, 0), + load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0), {SegEntries, PubCount, AckCount, Segment1} end. -load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount, - AckCount) -> +load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of {ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS, - RelSeq:?REL_SEQ_BITS>>} - when StartRel =< RelSeq andalso RelSeq < EndRel -> + RelSeq:?REL_SEQ_BITS>>} -> {AckCount1, SegEntries1} = deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries), - load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1, - PubCount, AckCount1); + load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount, + AckCount1); {ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, - IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} - when StartRel =< RelSeq andalso RelSeq < EndRel -> + IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), @@ -862,11 +859,8 @@ load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount, array:set(RelSeq, {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries), - load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1, - PubCount + 1, AckCount); - {ok, _SomeBinary} -> - load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, - PubCount, AckCount); + load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1, + AckCount); _ErrOrEoF -> {SegEntries, PubCount, AckCount} end. @@ -894,18 +888,15 @@ bool_to_int(false) -> 0. %% Combine what we have just read from a segment file with what we're %% holding for that segment in memory. There must be no %% duplicates. Used when providing segment entries to the variable -%% queue. RelStart is inclusive, RelEnd is exclusive. -journal_plus_segment(JEntries, SegEntries, RelStart, RelEnd) -> +%% queue. +journal_plus_segment(JEntries, SegEntries) -> array:sparse_foldl( - fun (RelSeq, JObj, SegEntriesOut) - when RelStart =< RelSeq andalso RelSeq < RelEnd -> + fun (RelSeq, JObj, SegEntriesOut) -> SegEntry = array:get(RelSeq, SegEntriesOut), case journal_plus_segment1(JObj, SegEntry) of undefined -> array:reset(RelSeq, SegEntriesOut); Obj -> array:set(RelSeq, Obj, SegEntriesOut) - end; - (_RelSeq, _JObj, SegEntriesOut) -> - SegEntriesOut + end end, SegEntries, JEntries). %% Here, the result is the item which we may be adding to (for items |
