diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-21 15:40:33 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-21 15:40:33 +0100 |
| commit | 7d362473fc22c662cde3e7853ca3bcff3979ab4d (patch) | |
| tree | a26521e6793bfd8c2ff5d4ae9959497b6dc9c7d3 | |
| parent | 1eff3e2269f589872a081d03c9491eb93e6d9e11 (diff) | |
| download | rabbitmq-server-git-7d362473fc22c662cde3e7853ca3bcff3979ab4d.tar.gz | |
change queue_index:read API
...to read exactly the messages within the specified bounds, rather
than just up to next segment boundary. This API is cleaner and means
we no longer rely on a hidden invariant in variable_queue.
| -rw-r--r-- | src/rabbit_queue_index.erl | 47 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
3 files changed, 32 insertions, 33 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 9c75786470..c1054d85d0 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -197,7 +197,7 @@ -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{guid(), seq_id(), boolean(), boolean()}], seq_id(), qistate()}). + {[{guid(), seq_id(), boolean(), boolean()}], qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -271,32 +271,17 @@ flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> - {[], StartEnd, State}; + {[], State}; read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. - {StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), - {EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End), - Start1 = reconstruct_seq_id(StartSeg + 1, 0), - Next = case End =< Start1 of - true -> End; - false -> Start1 - end, - MaxRelSeq = case StartSeg =:= EndSeg of - true -> EndRelSeq; - false -> ?SEGMENT_ENTRY_COUNT - end, - Segment = segment_find_or_new(StartSeg, Dir, Segments), - Messages = segment_entries_foldr( - fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) - when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq -> - [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), - IsPersistent, IsDelivered == del} | Acc ]; - (_RelSeq, _Value, Acc) -> - Acc - end, [], Segment), - Segments1 = segment_store(Segment, Segments), - {Messages, Next, State #qistate { segments = Segments1 }}. + LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), + UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1), + {Messages, Segments1} = + lists:foldr(fun (Seg, Acc) -> + read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir) + end, {[], Segments}, lists:seq(StartSeg, EndSeg)), + {Messages, State #qistate { segments = Segments1 }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), @@ -758,6 +743,20 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> end, Hdl. +read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq}, + {Messages, Segments}, Dir) -> + Segment = segment_find_or_new(Seg, Dir, Segments), + {segment_entries_foldr( + fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) + when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso + (Seg < EndSeg orelse EndRelSeq >= RelSeq) -> + [ {Guid, reconstruct_seq_id(StartSeg, RelSeq), + IsPersistent, IsDelivered == del} | Acc ]; + (_RelSeq, _Value, Acc) -> + Acc + end, Messages, Segment), + segment_store(Segment, Segments)}. + segment_entries_foldr(Fun, Init, Segment = #segment { journal_entries = JEntries }) -> {SegEntries, _UnackedCount} = load_segment(false, Segment), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 38b01b2c8e..c6c62c0685 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1593,7 +1593,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), - {ReadA, SegmentSize, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), + {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), _Qi5 = rabbit_queue_index:terminate([], Qi4), @@ -1604,7 +1604,7 @@ test_queue_index() -> {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), - {ReadB, SegmentSize, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), + {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsGuidsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), @@ -1615,7 +1615,7 @@ test_queue_index() -> {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13), - {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), + {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsGuidsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), @@ -1678,10 +1678,10 @@ test_queue_index() -> {Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46), Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47), Qi49 = rabbit_queue_index:ack([1,2,3], Qi48), - {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49), - {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50), + {[], Qi50} = rabbit_queue_index:read(0, 4, Qi49), + {ReadD, Qi51} = rabbit_queue_index:read(4, 7, Qi50), ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]), - {ReadE, 9, Qi52} = rabbit_queue_index:read(7, 9, Qi51), + {ReadE, Qi52} = rabbit_queue_index:read(7, 9, Qi51), ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]), _Qi53 = rabbit_queue_index:delete_and_terminate(Qi52), ok = stop_msg_store(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 244c9ba184..0f0a14c627 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1269,11 +1269,11 @@ maybe_deltas_to_betas(State = #vqstate { #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, - DeltaSeqIdEnd1 = + DeltaSeqId1 = lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), - {List, DeltaSeqId1, IndexState1} = - rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd1, IndexState), + {List, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = betas_from_index_entries( List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, |
