summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-05-18 11:25:43 +0100
committerMatthew Sackman <matthew@lshift.net>2010-05-18 11:25:43 +0100
commit3b8df23285a29c054b1730471b8def8c699aeb99 (patch)
tree1b4a779670c9fce407868d0f68060521cf0afec6 /src
parent3c33e69b6c3455e1d06b1d8bc0aea3e6231a4dd2 (diff)
downloadrabbitmq-server-git-3b8df23285a29c054b1730471b8def8c699aeb99.tar.gz
Drop segment_size and instead have a current_segment as well as a next_segment function
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_queue_index.erl10
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_variable_queue.erl16
3 files changed, 12 insertions, 20 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 36dd5c6d1c..764c795006 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -33,7 +33,8 @@
-export([init/3, terminate/2, terminate_and_erase/1, publish/4,
deliver/2, ack/2, sync/2, flush/1, read_segment_entries/2,
- next_segment_boundary/1, segment_size/0, bounds/1, recover/1]).
+ next_segment_boundary/1, current_segment_boundary/1, bounds/1,
+ recover/1]).
-define(CLEAN_FILENAME, "clean.dot").
@@ -196,7 +197,7 @@
-spec(read_segment_entries/2 :: (seq_id(), qistate()) ->
{[{guid(), seq_id(), boolean(), boolean()}], qistate()}).
-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()).
--spec(segment_size/0 :: () -> non_neg_integer()).
+-spec(current_segment_boundary/1 :: (seq_id()) -> seq_id()).
-spec(bounds/1 :: (qistate()) ->
{non_neg_integer(), non_neg_integer(), qistate()}).
-spec(recover/1 :: ([queue_name()]) -> {[[any()]], startup_fun_state()}).
@@ -343,8 +344,9 @@ next_segment_boundary(SeqId) ->
{Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
reconstruct_seq_id(Seg + 1, 0).
-segment_size() ->
- ?SEGMENT_ENTRY_COUNT.
+current_segment_boundary(SeqId) ->
+ {Seg, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId),
+ reconstruct_seq_id(Seg, 0).
bounds(State) ->
SegNums = all_segment_nums(State),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index f69a9dc594..358f857b12 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1448,7 +1448,7 @@ test_queue_init() ->
end).
test_queue_index() ->
- SegmentSize = rabbit_queue_index:segment_size(),
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
TwoSegs = SegmentSize + SegmentSize,
stop_msg_store(),
ok = empty_test_queue(),
@@ -1579,7 +1579,7 @@ test_variable_queue() ->
passed.
test_variable_queue_dynamic_duration_change() ->
- SegmentSize = rabbit_queue_index:segment_size(),
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
VQ0 = fresh_variable_queue(),
%% start by sending in a couple of segments worth
Len1 = 2*SegmentSize,
@@ -1630,7 +1630,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
end.
test_variable_queue_partial_segments_delta_thing() ->
- SegmentSize = rabbit_queue_index:segment_size(),
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
HalfSegment = SegmentSize div 2,
VQ0 = fresh_variable_queue(),
VQ1 = variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index eb0f45c120..856b1f0c36 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -829,7 +829,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
{bpqueue:from_list([{true, Filtered}]), IndexState1}.
read_index_segment(SeqId, IndexState) ->
- SeqId1 = SeqId + rabbit_queue_index:segment_size(),
+ SeqId1 = rabbit_queue_index:next_segment_boundary(SeqId),
case rabbit_queue_index:read_segment_entries(SeqId, IndexState) of
{[], IndexState1} -> read_index_segment(SeqId1, IndexState1);
{List, IndexState1} -> {List, IndexState1, SeqId1}
@@ -963,7 +963,7 @@ delete1(_PersistentStore, _TransientThreshold, NextSeqId, Count, DeltaSeqId,
{Count, IndexState};
delete1(PersistentStore, TransientThreshold, NextSeqId, Count, DeltaSeqId,
IndexState) ->
- Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(),
+ Delta1SeqId = rabbit_queue_index:next_segment_boundary(DeltaSeqId),
case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of
{[], IndexState1} ->
delete1(PersistentStore, TransientThreshold, NextSeqId, Count,
@@ -1203,8 +1203,7 @@ publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
%% delta may be empty, seq_id > next_segment_boundary from q3
%% head, so we need to find where the segment boundary is before
%% or equal to seq_id
- DeltaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
- rabbit_queue_index:segment_size(),
+ DeltaSeqId = rabbit_queue_index:current_segment_boundary(SeqId),
Delta1 = #delta { start_seq_id = DeltaSeqId, count = 1,
end_seq_id = SeqId + 1 },
State #vqstate { index_state = IndexState1,
@@ -1491,15 +1490,6 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,
true -> %% already only holding LTE one segment indices in q3
State1;
false ->
- %% ASSERTION
- %% This says that if Delta1SeqId /= undefined then
- %% the gap from Limit to Delta1SeqId is an integer
- %% multiple of segment_size
- 0 = case Delta1SeqId of
- undefined -> 0;
- _ -> (Delta1SeqId - Limit) rem
- rabbit_queue_index:segment_size()
- end,
%% SeqIdMax is low in the sense that it must be
%% lower than the seq_id in delta1, in fact either
%% delta1 has undefined as its seq_id or there