summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-21 15:40:33 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-21 15:40:33 +0100
commit7d362473fc22c662cde3e7853ca3bcff3979ab4d (patch)
treea26521e6793bfd8c2ff5d4ae9959497b6dc9c7d3
parent1eff3e2269f589872a081d03c9491eb93e6d9e11 (diff)
downloadrabbitmq-server-git-7d362473fc22c662cde3e7853ca3bcff3979ab4d.tar.gz
change queue_index:read API
...to read exactly the messages within the specified bounds, rather than just up to next segment boundary. This API is cleaner and means we no longer rely on a hidden invariant in variable_queue.
-rw-r--r--src/rabbit_queue_index.erl47
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl6
3 files changed, 32 insertions, 33 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 9c75786470..c1054d85d0 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -197,7 +197,7 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{guid(), seq_id(), boolean(), boolean()}], seq_id(), qistate()}).
+ {[{guid(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -271,32 +271,17 @@ flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
- {[], StartEnd, State};
+ {[], State};
read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
- {StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
- {EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End),
- Start1 = reconstruct_seq_id(StartSeg + 1, 0),
- Next = case End =< Start1 of
- true -> End;
- false -> Start1
- end,
- MaxRelSeq = case StartSeg =:= EndSeg of
- true -> EndRelSeq;
- false -> ?SEGMENT_ENTRY_COUNT
- end,
- Segment = segment_find_or_new(StartSeg, Dir, Segments),
- Messages = segment_entries_foldr(
- fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
- when StartRelSeq =< RelSeq andalso RelSeq < MaxRelSeq ->
- [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
- IsPersistent, IsDelivered == del} | Acc ];
- (_RelSeq, _Value, Acc) ->
- Acc
- end, [], Segment),
- Segments1 = segment_store(Segment, Segments),
- {Messages, Next, State #qistate { segments = Segments1 }}.
+ LowerB = {StartSeg, _StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
+ UpperB = {EndSeg, _EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End - 1),
+ {Messages, Segments1} =
+ lists:foldr(fun (Seg, Acc) ->
+ read_bounded_segment(Seg, LowerB, UpperB, Acc, Dir)
+ end, {[], Segments}, lists:seq(StartSeg, EndSeg)),
+ {Messages, State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
@@ -758,6 +743,20 @@ write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
end,
Hdl.
+read_bounded_segment(Seg, {StartSeg, StartRelSeq}, {EndSeg, EndRelSeq},
+ {Messages, Segments}, Dir) ->
+ Segment = segment_find_or_new(Seg, Dir, Segments),
+ {segment_entries_foldr(
+ fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc)
+ when (Seg > StartSeg orelse StartRelSeq =< RelSeq) andalso
+ (Seg < EndSeg orelse EndRelSeq >= RelSeq) ->
+ [ {Guid, reconstruct_seq_id(StartSeg, RelSeq),
+ IsPersistent, IsDelivered == del} | Acc ];
+ (_RelSeq, _Value, Acc) ->
+ Acc
+ end, Messages, Segment),
+ segment_store(Segment, Segments)}.
+
segment_entries_foldr(Fun, Init,
Segment = #segment { journal_entries = JEntries }) ->
{SegEntries, _UnackedCount} = load_segment(false, Segment),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 38b01b2c8e..c6c62c0685 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1593,7 +1593,7 @@ test_queue_index() ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
- {ReadA, SegmentSize, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
+ {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
_Qi5 = rabbit_queue_index:terminate([], Qi4),
@@ -1604,7 +1604,7 @@ test_queue_index() ->
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
- {ReadB, SegmentSize, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
+ {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
@@ -1615,7 +1615,7 @@ test_queue_index() ->
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
- {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
+ {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
@@ -1678,10 +1678,10 @@ test_queue_index() ->
{Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
Qi48 = rabbit_queue_index:deliver([2,3,5,6], Qi47),
Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
- {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
- {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
+ {[], Qi50} = rabbit_queue_index:read(0, 4, Qi49),
+ {ReadD, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]),
- {ReadE, 9, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
+ {ReadE, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
_Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
ok = stop_msg_store(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 244c9ba184..0f0a14c627 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1269,11 +1269,11 @@ maybe_deltas_to_betas(State = #vqstate {
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
end_seq_id = DeltaSeqIdEnd } = Delta,
- DeltaSeqIdEnd1 =
+ DeltaSeqId1 =
lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
- {List, DeltaSeqId1, IndexState1} =
- rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd1, IndexState),
+ {List, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} = betas_from_index_entries(
List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },