summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-10-08 15:32:27 +0100
committerMatthew Sackman <matthew@lshift.net>2009-10-08 15:32:27 +0100
commit1ecb08930cfd4cfad92ac382fd8208c8f889a0d2 (patch)
tree845664082fcdcdf29e32c818decce19f85be6102 /src
parent50c79aa71c0356c202b5a64c39aea5d62f384d07 (diff)
downloadrabbitmq-server-git-1ecb08930cfd4cfad92ac382fd8208c8f889a0d2.tar.gz
minor further corrections and modifications
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl39
1 files changed, 21 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 79a7f38bee..85dfbbac5b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -99,7 +99,8 @@
%%
%% A further invariant is that if the queue is non empty, either q4 or
%% q3 contains at least one entry. I.e. we never allow gamma to
-%% contain all msgs in the queue.
+%% contain all msgs in the queue. Also, if q4 is non empty and gamma
+%% is non empty then q3 must be non empty.
init(QueueName) ->
{NextSeqId, IndexState} = rabbit_queue_index:init(QueueName),
@@ -265,12 +266,11 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
State1 #vqstate { q1 = queue:new(),
q4 = queue:join(Q4a, Q1) };
{true, false} ->
- {List, IndexState1} =
- rabbit_queue_index:read_segment_entries(GammaSeqId,
- IndexState),
+ {List, IndexState1, Gamma1SeqId} =
+ read_index_segment(GammaSeqId, IndexState),
State3 = State1 #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because
- %% of acks. In fact, List may be []
+ %% of acks. But it can't be []
Q3b = betas_from_segment_entries(List),
case GammaCount - length(List) of
0 ->
@@ -282,10 +282,8 @@ out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState,
q2 = queue:new(), q3 = queue:join(Q3b, Q2) };
N when N > 0 ->
State3 #vqstate {
- gamma = #gamma {
- seq_id = GammaSeqId +
- rabbit_queue_index:segment_size(),
- count = N }, q3 = Q3b }
+ gamma = #gamma { seq_id = Gamma1SeqId,
+ count = N }, q3 = Q3b }
end;
{false, _} ->
%% q3 still isn't empty, we've not touched
@@ -304,6 +302,13 @@ betas_from_segment_entries(List) ->
index_on_disk = true }
end, List)).
+read_index_segment(SeqId, IndexState) ->
+ SeqId1 = SeqId + rabbit_queue_index:segment_size(),
+ case rabbit_queue_index:read_segment_entries(SeqId, IndexState) of
+ {[], IndexState1} -> read_index_segment(SeqId1, IndexState1);
+ {List, IndexState1} -> {List, IndexState1, SeqId1}
+ end.
+
maybe_start_prefetcher(State) ->
%% TODO
State.
@@ -456,19 +461,17 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
case Gamma1SeqId of
Limit -> %% already only holding the minimum, nothing to do
State1;
- _ when Gamma1SeqId == undefined orelse Gamma1SeqId > Limit ->
+ _ when Gamma1SeqId == undefined orelse
+ (is_integer(Gamma1SeqId) andalso Gamma1SeqId > Limit) ->
%% ASSERTION (sadly large!)
- %% This says that if Gamma1SeqId != undefined then
+ %% This says that if Gamma1SeqId /= undefined then
%% the gap from Limit to Gamma1SeqId is an integer
%% multiple of segment_size
- SegmentCount =
- case Gamma1SeqId of
- undefined -> undefined;
- _ -> (Gamma1SeqId - Limit) /
+ 0 = case Gamma1SeqId of
+ undefined -> 0;
+ _ -> (Gamma1SeqId - Limit) rem
rabbit_queue_index:segment_size()
end,
- true = (is_integer(SegmentCount) andalso SegmentCount > 0)
- orelse Gamma1SeqId == undefined,
%% LowSeqId is low in the sense that it must be
%% lower than the seq_id in gamma1, in fact either
%% gamma1 has undefined as its seq_id or there
@@ -476,7 +479,7 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
%% X < gamma1's seq_id (would be +1 if it wasn't
%% for the possibility of gaps in the seq_ids).
%% But because we use queue:out_r, LowSeqId is
- %% actually also the highest seqid of the betas we
+ %% actually also the highest seq_id of the betas we
%% transfer from q3 to gammas.
{LowSeqId, Len2, Q3b, IndexState2} =
push_betas_to_gammas(fun queue:out_r/1, Limit, Q3,