summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-18 13:28:01 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-18 13:28:01 +0100
commit29b43b7761cd4ddf9861982b367431db65f7b363 (patch)
treef505f3c9a84113328981cc3b396e92545004c662 /src
parentafdeb07ef3abf056976078ea3b68a3cb6a94f266 (diff)
downloadrabbitmq-server-git-29b43b7761cd4ddf9861982b367431db65f7b363.tar.gz
Apparently it's wrong to do the least amount of work possible.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl55
1 files changed, 23 insertions, 32 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7cf36193f6..a4bd4cd77b 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -351,14 +351,16 @@ read(Start, End, State = #qistate { segments = Segments,
end,
Segment = segment_find_or_new(StartSeg, Dir, Segments),
{SegEntries, _PubCount, _AckCount, Segment1} =
- load_segment(false, StartRelSeq, MaxRelSeq, Segment),
+ load_segment(false, Segment),
#segment { journal_entries = JEntries } = Segment1,
{array:sparse_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) ->
+ fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq ->
[ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
- IsPersistent, IsDelivered == del} | Acc ]
- end, [],
- journal_plus_segment(JEntries, SegEntries, StartRelSeq, MaxRelSeq)),
+ IsPersistent, IsDelivered == del} | Acc ];
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, [], journal_plus_segment(JEntries, SegEntries)),
Again,
State #qistate { segments = segment_store(Segment1, Segments) }}.
@@ -478,7 +480,7 @@ terminate(StoreShutdown, Terms, State =
recover_segment(ContainsCheckFun, CleanShutdown, Segment) ->
{SegEntries, PubCount, AckCount, Segment1} =
- load_segment(false, 0, ?SEGMENT_ENTRY_COUNT, Segment),
+ load_segment(false, Segment),
array:sparse_foldl(
fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) ->
recover_message(ContainsCheckFun(Guid), CleanShutdown,
@@ -652,7 +654,7 @@ load_journal(State) ->
%% them if duplicates are in the journal. The counts
%% here are purely from the segment itself.
{SegEntries, PubCountInSeg, AckCountInSeg, Segment1} =
- load_segment(true, 0, ?SEGMENT_ENTRY_COUNT, Segment),
+ load_segment(true, Segment),
%% Removed counts here are the number of pubs and
%% acks that are duplicates - i.e. found in both the
%% segment and journal.
@@ -825,9 +827,8 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
%% number of unacked msgs is PubCount - AckCount. If KeepAcks is
%% false, then array:sparse_size(SegEntries) == PubCount -
%% AckCount. If KeepAcks is true, then array:sparse_size(SegEntries)
-%% == PubCount. StartRelSeq is inclusive, EndRelSeq is exclusive.
-load_segment(KeepAcks, StartRelSeq, EndRelSeq,
- Segment = #segment { path = Path, handle = SegHdl }) ->
+%% == PubCount.
+load_segment(KeepAcks, Segment = #segment { path = Path, handle = SegHdl }) ->
SegmentExists = case SegHdl of
undefined -> filelib:is_file(Path);
_ -> true
@@ -837,24 +838,20 @@ load_segment(KeepAcks, StartRelSeq, EndRelSeq,
true -> {Hdl, Segment1} = get_segment_handle(Segment),
{ok, 0} = file_handle_cache:position(Hdl, bof),
{SegEntries, PubCount, AckCount} =
- load_segment_entries(KeepAcks, StartRelSeq, EndRelSeq, Hdl,
- array_new(), 0, 0),
+ load_segment_entries(KeepAcks, Hdl, array_new(), 0, 0),
{SegEntries, PubCount, AckCount, Segment1}
end.
-load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount,
- AckCount) ->
+load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
case file_handle_cache:read(Hdl, ?REL_SEQ_ONLY_ENTRY_LENGTH_BYTES) of
{ok, <<?REL_SEQ_ONLY_PREFIX:?REL_SEQ_ONLY_PREFIX_BITS,
- RelSeq:?REL_SEQ_BITS>>}
- when StartRel =< RelSeq andalso RelSeq < EndRel ->
+ RelSeq:?REL_SEQ_BITS>>} ->
{AckCount1, SegEntries1} =
deliver_or_ack_msg(KeepAcks, RelSeq, AckCount, SegEntries),
- load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1,
- PubCount, AckCount1);
+ load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount,
+ AckCount1);
{ok, <<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
- IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>}
- when StartRel =< RelSeq andalso RelSeq < EndRel ->
+ IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
{ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
@@ -862,11 +859,8 @@ load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries, PubCount,
array:set(RelSeq,
{{Guid, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries),
- load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries1,
- PubCount + 1, AckCount);
- {ok, _SomeBinary} ->
- load_segment_entries(KeepAcks, StartRel, EndRel, Hdl, SegEntries,
- PubCount, AckCount);
+ load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1,
+ AckCount);
_ErrOrEoF ->
{SegEntries, PubCount, AckCount}
end.
@@ -894,18 +888,15 @@ bool_to_int(false) -> 0.
%% Combine what we have just read from a segment file with what we're
%% holding for that segment in memory. There must be no
%% duplicates. Used when providing segment entries to the variable
-%% queue. RelStart is inclusive, RelEnd is exclusive.
-journal_plus_segment(JEntries, SegEntries, RelStart, RelEnd) ->
+%% queue.
+journal_plus_segment(JEntries, SegEntries) ->
array:sparse_foldl(
- fun (RelSeq, JObj, SegEntriesOut)
- when RelStart =< RelSeq andalso RelSeq < RelEnd ->
+ fun (RelSeq, JObj, SegEntriesOut) ->
SegEntry = array:get(RelSeq, SegEntriesOut),
case journal_plus_segment1(JObj, SegEntry) of
undefined -> array:reset(RelSeq, SegEntriesOut);
Obj -> array:set(RelSeq, Obj, SegEntriesOut)
- end;
- (_RelSeq, _JObj, SegEntriesOut) ->
- SegEntriesOut
+ end
end, SegEntries, JEntries).
%% Here, the result is the item which we may be adding to (for items