diff options
| -rw-r--r-- | src/rabbit_queue_index.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 20 |
3 files changed, 18 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 5f94cf9d23..d062c4fd0a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -196,8 +196,7 @@ -spec(sync/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush/1 :: (qistate()) -> qistate()). -spec(read/3 :: (seq_id(), seq_id(), qistate()) -> - {[{guid(), seq_id(), boolean(), boolean()}], - seq_id() | 'undefined', qistate()}). + {[{guid(), seq_id(), boolean(), boolean()}], seq_id(), qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). @@ -282,17 +281,17 @@ flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). read(StartEnd, StartEnd, State) -> - {[], undefined, State}; + {[], StartEnd, State}; read(Start, End, State = #qistate { segments = Segments, dir = Dir }) when Start =< End -> %% Start is inclusive, End is exclusive. {StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start), {EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End), Start1 = reconstruct_seq_id(StartSeg + 1, 0), - Again = case End =< Start1 of - true -> undefined; - false -> Start1 - end, + Next = case End =< Start1 of + true -> End; + false -> Start1 + end, MaxRelSeq = case StartSeg =:= EndSeg of true -> EndRelSeq; false -> ?SEGMENT_ENTRY_COUNT @@ -307,7 +306,7 @@ read(Start, End, State = #qistate { segments = Segments, Acc end, [], Segment), Segments1 = segment_store(Segment, Segments), - {Messages, Again, State #qistate { segments = Segments1 }}. + {Messages, Next, State #qistate { segments = Segments1 }}. next_segment_boundary(SeqId) -> {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d49208c366..f3df66ca58 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1584,7 +1584,7 @@ test_queue_index() -> {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0), {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2), - {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), + {ReadA, SegmentSize, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3), ok = verify_read_with_published(false, false, ReadA, lists:reverse(SeqIdsGuidsA)), _Qi5 = rabbit_queue_index:terminate([], Qi4), @@ -1595,7 +1595,7 @@ test_queue_index() -> {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6), {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8), - {ReadB, undefined, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), + {ReadB, SegmentSize, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9), ok = verify_read_with_published(false, true, ReadB, lists:reverse(SeqIdsGuidsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), @@ -1606,7 +1606,7 @@ test_queue_index() -> {LenB, _Terms2, Qi12} = test_queue_init(), {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12), Qi14 = queue_index_deliver(SeqIdsB, Qi13), - {ReadC, undefined, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), + {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14), ok = verify_read_with_published(true, true, ReadC, lists:reverse(SeqIdsGuidsB)), Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15), @@ -1669,10 +1669,10 @@ test_queue_index() -> {Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46), Qi48 = queue_index_deliver([2,3,5,6], Qi47), Qi49 = rabbit_queue_index:ack([1,2,3], Qi48), - {[], undefined, Qi50} = rabbit_queue_index:read(0, 4, Qi49), - {ReadD, undefined, Qi51} = rabbit_queue_index:read(4, 7, Qi50), + {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49), + {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50), ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]), - {ReadE, undefined, Qi52} = rabbit_queue_index:read(7, 9, Qi51), + {ReadE, 9, Qi52} = rabbit_queue_index:read(7, 9, Qi51), ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]), _Qi53 = rabbit_queue_index:delete_and_terminate(Qi52), ok = stop_msg_store(), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 377b5737db..61437229ae 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -807,16 +807,6 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) -> end, {[], IndexState}, List), {bpqueue:from_list([{true, Filtered}]), IndexState1}. -read_one_index_segment(StartSeqId, EndSeqId, IndexState) - when StartSeqId =< EndSeqId -> - case rabbit_queue_index:read(StartSeqId, EndSeqId, IndexState) of - {List, Again, IndexState1} when List /= [] orelse Again =:= undefined -> - {List, IndexState1, - rabbit_queue_index:next_segment_boundary(StartSeqId)}; - {[], StartSeqId1, IndexState1} -> - read_one_index_segment(StartSeqId1, EndSeqId, IndexState1) - end. - ensure_binary_properties(Msg = #basic_message { content = Content }) -> Msg #basic_message { content = rabbit_binary_parser:clear_decoded_content( @@ -970,10 +960,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns}, State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }. delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState) - when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId -> + when DeltaSeqId >= NextSeqId -> IndexState; delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> - {List, Again, IndexState1} = + {List, Next, IndexState1} = rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState), IndexState2 = case List of @@ -982,7 +972,7 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) -> List, TransientThreshold, IndexState1), remove_queue_entries(fun beta_fold/3, Q, IndexState3) end, - delete1(TransientThreshold, NextSeqId, Again, IndexState2). + delete1(TransientThreshold, NextSeqId, Next, IndexState2). purge_betas_and_deltas(State = #vqstate { q3 = Q3, index_state = IndexState }) -> @@ -1304,8 +1294,8 @@ maybe_deltas_to_betas(State = #vqstate { #delta { start_seq_id = DeltaSeqId, count = DeltaCount, end_seq_id = DeltaSeqIdEnd } = Delta, - {List, IndexState1, DeltaSeqId1} = - read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState), + {List, DeltaSeqId1, IndexState1} = + rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd, IndexState), {Q3a, IndexState2} = betas_from_segment_entries( List, TransientThreshold, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, |
