summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_queue_index.erl15
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl20
3 files changed, 18 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 5f94cf9d23..d062c4fd0a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,8 +196,7 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{guid(), seq_id(), boolean(), boolean()}],
- seq_id() | 'undefined', qistate()}).
+ {[{guid(), seq_id(), boolean(), boolean()}], seq_id(), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -282,17 +281,17 @@ flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
- {[], undefined, State};
+ {[], StartEnd, State};
read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
{StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
{EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End),
Start1 = reconstruct_seq_id(StartSeg + 1, 0),
- Again = case End =< Start1 of
- true -> undefined;
- false -> Start1
- end,
+ Next = case End =< Start1 of
+ true -> End;
+ false -> Start1
+ end,
MaxRelSeq = case StartSeg =:= EndSeg of
true -> EndRelSeq;
false -> ?SEGMENT_ENTRY_COUNT
@@ -307,7 +306,7 @@ read(Start, End, State = #qistate { segments = Segments,
Acc
end, [], Segment),
Segments1 = segment_store(Segment, Segments),
- {Messages, Again, State #qistate { segments = Segments1 }}.
+ {Messages, Next, State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d49208c366..f3df66ca58 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1584,7 +1584,7 @@ test_queue_index() ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
- {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
+ {ReadA, SegmentSize, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
_Qi5 = rabbit_queue_index:terminate([], Qi4),
@@ -1595,7 +1595,7 @@ test_queue_index() ->
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
- {ReadB, undefined, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
+ {ReadB, SegmentSize, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
@@ -1606,7 +1606,7 @@ test_queue_index() ->
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
- {ReadC, undefined, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
+ {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
@@ -1669,10 +1669,10 @@ test_queue_index() ->
{Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
Qi48 = queue_index_deliver([2,3,5,6], Qi47),
Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
- {[], undefined, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
- {ReadD, undefined, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
+ {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
+ {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]),
- {ReadE, undefined, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
+ {ReadE, 9, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
_Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
ok = stop_msg_store(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 377b5737db..61437229ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -807,16 +807,6 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) ->
end, {[], IndexState}, List),
{bpqueue:from_list([{true, Filtered}]), IndexState1}.
-read_one_index_segment(StartSeqId, EndSeqId, IndexState)
- when StartSeqId =< EndSeqId ->
- case rabbit_queue_index:read(StartSeqId, EndSeqId, IndexState) of
- {List, Again, IndexState1} when List /= [] orelse Again =:= undefined ->
- {List, IndexState1,
- rabbit_queue_index:next_segment_boundary(StartSeqId)};
- {[], StartSeqId1, IndexState1} ->
- read_one_index_segment(StartSeqId1, EndSeqId, IndexState1)
- end.
-
ensure_binary_properties(Msg = #basic_message { content = Content }) ->
Msg #basic_message {
content = rabbit_binary_parser:clear_decoded_content(
@@ -970,10 +960,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState)
- when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId ->
+ when DeltaSeqId >= NextSeqId ->
IndexState;
delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
- {List, Again, IndexState1} =
+ {List, Next, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState),
IndexState2 =
case List of
@@ -982,7 +972,7 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
List, TransientThreshold, IndexState1),
remove_queue_entries(fun beta_fold/3, Q, IndexState3)
end,
- delete1(TransientThreshold, NextSeqId, Again, IndexState2).
+ delete1(TransientThreshold, NextSeqId, Next, IndexState2).
purge_betas_and_deltas(State = #vqstate { q3 = Q3,
index_state = IndexState }) ->
@@ -1304,8 +1294,8 @@ maybe_deltas_to_betas(State = #vqstate {
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
end_seq_id = DeltaSeqIdEnd } = Delta,
- {List, IndexState1, DeltaSeqId1} =
- read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState),
+ {List, DeltaSeqId1, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd, IndexState),
{Q3a, IndexState2} = betas_from_segment_entries(
List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },