summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-06-17 10:33:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2010-06-17 10:33:00 +0100
commit2bf4f4559bca46f10df81cc0a06503e4b80ae203 (patch)
tree5933024b72e7c582365bdc5b7706f998e57f79f7 /src
parent89b87729658e09af5891e39f655bd3df62e6af7e (diff)
downloadrabbitmq-server-git-2bf4f4559bca46f10df81cc0a06503e4b80ae203.tar.gz
get rid of vq:read_one_index_segment
The purpose of the function was to keep reading until either some data was found or the end was reached. However, both call sites contain loops already that effectively do the same, making the function redundant. There is also a small tweak to the qi:read API - it now returns the "next seq id to read". Previously it was returning 'undefined' when the requested (exclusive) End was inside the same segment as the Start, so now we simply return that End in that case.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl15
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl20
3 files changed, 18 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 5f94cf9d23..d062c4fd0a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -196,8 +196,7 @@
-spec(sync/2 :: ([seq_id()], qistate()) -> qistate()).
-spec(flush/1 :: (qistate()) -> qistate()).
-spec(read/3 :: (seq_id(), seq_id(), qistate()) ->
- {[{guid(), seq_id(), boolean(), boolean()}],
- seq_id() | 'undefined', qistate()}).
+ {[{guid(), seq_id(), boolean(), boolean()}], seq_id(), qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
@@ -282,17 +281,17 @@ flush(State = #qistate { dirty_count = 0 }) -> State;
flush(State) -> flush_journal(State).
read(StartEnd, StartEnd, State) ->
- {[], undefined, State};
+ {[], StartEnd, State};
read(Start, End, State = #qistate { segments = Segments,
dir = Dir }) when Start =< End ->
%% Start is inclusive, End is exclusive.
{StartSeg, StartRelSeq} = seq_id_to_seg_and_rel_seq_id(Start),
{EndSeg, EndRelSeq} = seq_id_to_seg_and_rel_seq_id(End),
Start1 = reconstruct_seq_id(StartSeg + 1, 0),
- Again = case End =< Start1 of
- true -> undefined;
- false -> Start1
- end,
+ Next = case End =< Start1 of
+ true -> End;
+ false -> Start1
+ end,
MaxRelSeq = case StartSeg =:= EndSeg of
true -> EndRelSeq;
false -> ?SEGMENT_ENTRY_COUNT
@@ -307,7 +306,7 @@ read(Start, End, State = #qistate { segments = Segments,
Acc
end, [], Segment),
Segments1 = segment_store(Segment, Segments),
- {Messages, Again, State #qistate { segments = Segments1 }}.
+ {Messages, Next, State #qistate { segments = Segments1 }}.
next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index d49208c366..f3df66ca58 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1584,7 +1584,7 @@ test_queue_index() ->
{0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
{Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
- {ReadA, undefined, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
+ {ReadA, SegmentSize, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
ok = verify_read_with_published(false, false, ReadA,
lists:reverse(SeqIdsGuidsA)),
_Qi5 = rabbit_queue_index:terminate([], Qi4),
@@ -1595,7 +1595,7 @@ test_queue_index() ->
{0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
{Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
- {ReadB, undefined, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
+ {ReadB, SegmentSize, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
ok = verify_read_with_published(false, true, ReadB,
lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
@@ -1606,7 +1606,7 @@ test_queue_index() ->
{LenB, _Terms2, Qi12} = test_queue_init(),
{0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
- {ReadC, undefined, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
+ {ReadC, SegmentSize, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
ok = verify_read_with_published(true, true, ReadC,
lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
@@ -1669,10 +1669,10 @@ test_queue_index() ->
{Qi47, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi46),
Qi48 = queue_index_deliver([2,3,5,6], Qi47),
Qi49 = rabbit_queue_index:ack([1,2,3], Qi48),
- {[], undefined, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
- {ReadD, undefined, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
+ {[], 4, Qi50} = rabbit_queue_index:read(0, 4, Qi49),
+ {ReadD, 7, Qi51} = rabbit_queue_index:read(4, 7, Qi50),
ok = verify_read_with_published(true, false, ReadD, [Four, Five, Six]),
- {ReadE, undefined, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
+ {ReadE, 9, Qi52} = rabbit_queue_index:read(7, 9, Qi51),
ok = verify_read_with_published(false, false, ReadE, [Seven, Eight]),
_Qi53 = rabbit_queue_index:delete_and_terminate(Qi52),
ok = stop_msg_store(),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 377b5737db..61437229ae 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -807,16 +807,6 @@ betas_from_segment_entries(List, TransientThreshold, IndexState) ->
end, {[], IndexState}, List),
{bpqueue:from_list([{true, Filtered}]), IndexState1}.
-read_one_index_segment(StartSeqId, EndSeqId, IndexState)
- when StartSeqId =< EndSeqId ->
- case rabbit_queue_index:read(StartSeqId, EndSeqId, IndexState) of
- {List, Again, IndexState1} when List /= [] orelse Again =:= undefined ->
- {List, IndexState1,
- rabbit_queue_index:next_segment_boundary(StartSeqId)};
- {[], StartSeqId1, IndexState1} ->
- read_one_index_segment(StartSeqId1, EndSeqId, IndexState1)
- end.
-
ensure_binary_properties(Msg = #basic_message { content = Content }) ->
Msg #basic_message {
content = rabbit_binary_parser:clear_decoded_content(
@@ -970,10 +960,10 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFuns},
State1 #vqstate { index_state = IndexState1, on_sync = {[], [], []} }.
delete1(_TransientThreshold, NextSeqId, DeltaSeqId, IndexState)
- when DeltaSeqId =:= undefined orelse DeltaSeqId >= NextSeqId ->
+ when DeltaSeqId >= NextSeqId ->
IndexState;
delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
- {List, Again, IndexState1} =
+ {List, Next, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, NextSeqId, IndexState),
IndexState2 =
case List of
@@ -982,7 +972,7 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
List, TransientThreshold, IndexState1),
remove_queue_entries(fun beta_fold/3, Q, IndexState3)
end,
- delete1(TransientThreshold, NextSeqId, Again, IndexState2).
+ delete1(TransientThreshold, NextSeqId, Next, IndexState2).
purge_betas_and_deltas(State = #vqstate { q3 = Q3,
index_state = IndexState }) ->
@@ -1304,8 +1294,8 @@ maybe_deltas_to_betas(State = #vqstate {
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
end_seq_id = DeltaSeqIdEnd } = Delta,
- {List, IndexState1, DeltaSeqId1} =
- read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState),
+ {List, DeltaSeqId1, IndexState1} =
+ rabbit_queue_index:read(DeltaSeqId, DeltaSeqIdEnd, IndexState),
{Q3a, IndexState2} = betas_from_segment_entries(
List, TransientThreshold, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },