diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-08 14:46:45 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-08 14:46:45 +0100 |
| commit | 144137b12fb4af9a30a56406e17fef2b7b1c17be (patch) | |
| tree | aecc0368ed271372436aec8937a588403305ee51 /src | |
| parent | 845c25497ce2c31de762bb815b7218138c38f089 (diff) | |
| download | rabbitmq-server-git-144137b12fb4af9a30a56406e17fef2b7b1c17be.tar.gz | |
Tidying and refactoring of the variable queue, some documentation, and the removal of a lot of algorithmic bugs. No real new features, but code in much better state.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 435 |
2 files changed, 273 insertions, 174 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 27952af161..b21651a234 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -140,7 +140,7 @@ -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {( [{'index', msg_id(), seq_id(), boolean(), boolean()}] + {( [{msg_id(), seq_id(), boolean(), boolean()}] | 'not_found'), qistate()}). -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(segment_size/0 :: () -> non_neg_integer()). @@ -152,7 +152,7 @@ %%---------------------------------------------------------------------------- init(Name) -> - Dir = filename:join(rabbit_mnesia:dir(), Name), + Dir = filename:join(queues_dir(), Name), ok = filelib:ensure_dir(filename:join(Dir, "nothing")), AckCounts = scatter_journal(Dir, find_ack_counts(Dir)), {ok, JournalHdl} = file:open(filename:join(Dir, ?ACK_JOURNAL_FILENAME), @@ -240,9 +240,8 @@ read_segment_entries(InitSeqId, State = {lists:foldl(fun (RelSeq, Acc) -> {MsgId, IsDelivered, IsPersistent} = dict:fetch(RelSeq, SDict), - [ {index, MsgId, - reconstruct_seq_id(SegNum, RelSeq), - IsPersistent, IsDelivered, true} | Acc] + [ {MsgId, reconstruct_seq_id(SegNum, RelSeq), + IsPersistent, IsDelivered} | Acc] end, [], RelSeqs), State}. @@ -257,6 +256,9 @@ segment_size() -> %% Minor Helpers %%---------------------------------------------------------------------------- +queues_dir() -> + filename:join(rabbit_mnesia:dir(), "queues"). + rev_sort(List) -> lists:sort(fun (A, B) -> B < A end, List). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f041f4783e..79a7f38bee 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -47,18 +47,64 @@ next_seq_id, out_counter, egress_rate, - old_egress_rate, avg_egress_rate, egress_rate_timestamp, prefetcher }). +-record(alpha, + { msg, + seq_id, + is_delivered, + msg_on_disk, + index_on_disk + }). + +-record(beta, + { msg_id, + seq_id, + is_persistent, + is_delivered, + index_on_disk + }). + +-record(gamma, + { seq_id, + count + }). + -include("rabbit.hrl"). +%% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4 +%% but they can only do so in the right form. q1 and q4 only hold +%% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index +%% in ram), and gamma is just a count of the number of index entries +%% on disk at that stage (msg on disk, index on disk). +%% +%% When a msg arrives, we decide which form it should be in. It is +%% then added to the rightmost appropriate queue, maintaining +%% order. Thus if the msg is to be an alpha, it will be added to q1, +%% unless all of q1, q2, gamma and q3 are empty, in which case it will +%% go to q4. If it is to be a beta, it will be added to q2 unless all +%% of q2 and gamma are empty, in which case it will go to q3. +%% +%% The major invariant is that if the msg is to be a beta, q1 will be +%% empty, and if it is to be a gamma then both q1 and q2 will be empty. +%% +%% When taking msgs out of the queue, if q4 is empty then we drain the +%% prefetcher. If that doesn't help then we read directly from q3, or +%% gamma, if q3 is empty. If q3 and gamma are empty then we have an +%% invariant that q2 must be empty because q2 can only grow if gamma +%% is non empty. +%% +%% A further invariant is that if the queue is non empty, either q4 or +%% q3 contains at least one entry. I.e. we never allow gamma to +%% contain all msgs in the queue. + init(QueueName) -> {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName), #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = {undefined, 0}, + gamma = #gamma { seq_id = undefined, count = 0 }, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, ram_msg_count = 0, @@ -67,60 +113,60 @@ init(QueueName) -> next_seq_id = NextSeqId, out_counter = 0, egress_rate = 0, - old_egress_rate = 0, avg_egress_rate = 0, egress_rate_timestamp = now(), prefetcher = undefined }. -in(Msg, IsDelivered, State) -> - in(test_keep_msg_in_ram(State), Msg, IsDelivered, State). +in(Msg, IsDelivered, State = #vqstate { next_seq_id = SeqId }) -> + in(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, + State #vqstate { next_seq_id = SeqId + 1 }). -in(msg_and_index, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered, State = #vqstate { index_state = IndexState, - next_seq_id = SeqId, - ram_msg_count = RamMsgCount - }) -> +in(msg, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + SeqId, IsDelivered, State = #vqstate { index_state = IndexState, + ram_msg_count = RamMsgCount }) -> MsgOnDisk = maybe_write_msg_to_disk(false, Msg), {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), - Entry = - {msg_and_index, Msg, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}, - State1 = State #vqstate { next_seq_id = SeqId + 1, - ram_msg_count = RamMsgCount + 1, + Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, + State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, index_state = IndexState1 }, store_alpha_entry(Entry, State1); -in(just_index, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - IsDelivered, State = #vqstate { index_state = IndexState, - next_seq_id = SeqId, q1 = Q1 }) -> +in(index, Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + SeqId, IsDelivered, State = #vqstate { index_state = IndexState, + q1 = Q1 }) -> true = maybe_write_msg_to_disk(true, Msg), {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), - Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}, - State1 = State #vqstate { next_seq_id = SeqId + 1, - index_state = IndexState1 }, + Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + is_persistent = IsPersistent, index_on_disk = IndexOnDisk }, + State1 = State #vqstate { index_state = IndexState1 }, true = queue:is_empty(Q1), %% ASSERTION store_beta_entry(Entry, State1); in(neither, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, - IsDelivered, State = #vqstate { index_state = IndexState, - next_seq_id = SeqId, - q1 = Q1, q2 = Q2, - gamma = {GammaSeqId, GammaCount} }) -> + SeqId, IsDelivered, State = #vqstate { index_state = IndexState, + q1 = Q1, q2 = Q2, gamma = Gamma }) -> true = maybe_write_msg_to_disk(true, Msg), {true, IndexState1} = maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION - State #vqstate { next_seq_id = SeqId + 1, - index_state = IndexState1, - gamma = {GammaSeqId, GammaCount + 1} }. + %% gamma may be empty, seq_id > next_segment_boundary from q3 + %% head, so we need to find where the segment boundary is before + %% or equal to seq_id + GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - + rabbit_queue_index:segment_size(), + Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 }, + State #vqstate { index_state = IndexState1, + gamma = combine_gammas(Gamma, Gamma1) }. set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, @@ -139,11 +185,14 @@ set_queue_ram_duration_target( remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Timestamp, out_counter = OutCount }) -> + %% We do an average over the last two values, but also hold the + %% current value separately so that the average always only + %% incorporates the last two values, and not the current value and + %% the last average. Averaging helps smooth out spikes. Now = now(), EgressRate = OutCount / timer:now_diff(Now, Timestamp), AvgEgressRate = (EgressRate + OldEgressRate) / 2, - State #vqstate { old_egress_rate = OldEgressRate, - egress_rate = EgressRate, + State #vqstate { egress_rate = EgressRate, avg_egress_rate = AvgEgressRate, egress_rate_timestamp = Now, out_counter = 0 }. @@ -163,8 +212,9 @@ out(State = end, out(State #vqstate { q4 = Q4a, prefetcher = undefined }); {{value, - {msg_and_index, Msg = #basic_message { guid = MsgId }, - SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} -> + #alpha { msg = Msg = #basic_message { guid = MsgId }, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }}, Q4a} -> IndexState1 = case IndexOnDisk andalso not IsDelivered of true -> @@ -175,64 +225,85 @@ out(State = AckTag = case {IndexOnDisk, MsgOnDisk} of {true, true } -> {ack_index_and_store, MsgId, SeqId}; {false, true } -> {ack_store, MsgId}; - {false, false} -> not_on_disk + {false, false} -> ack_not_on_disk end, {{Msg, IsDelivered, AckTag}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, index_state = IndexState1 }} end. -out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState, - gamma = {GammaSeqId, GammaCount}, q3 = Q3, - q4 = Q4 }) -> +out_from_q3(State = #vqstate { q1 = Q1, q2 = Q2, index_state = IndexState, + gamma = #gamma { seq_id = GammaSeqId, + count = GammaCount}, + q3 = Q3, q4 = Q4 }) -> case queue:out(Q3) of {empty, _Q3} -> - case GammaCount of - 0 -> - undefined = GammaSeqId, %% ASSERTION - true = queue:is_empty(Q2), %% ASSERTION - {empty, State}; - _ -> - {List = [_|_], IndexState1} = - rabbit_queue_index:read_segment_entries(GammaSeqId, - IndexState), - State1 = State #vqstate { index_state = IndexState1 }, - Q3a = queue:from_list(List), - State2 = - case GammaCount - length(List) of - 0 -> - State1 #vqstate { gamma = {undefined, 0}, - q2 = queue:new(), - q3 = queue:join(Q3a, Q2) }; - N when N > 0 -> - State1 #vqstate { gamma = - {rabbit_queue_index:segment_size() + - GammaSeqId, N}, - q3 = Q3a } - end, - out_from_q3(State2) - end; - {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}}, + 0 = GammaCount, %% ASSERTION + true = queue:is_empty(Q2), %% ASSERTION + true = queue:is_empty(Q1), %% ASSERTION + {empty, State}; + {{value, + #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, + is_persistent = IsPersistent, index_on_disk = IndexOnDisk }}, Q3a} -> {ok, Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }} = rabbit_msg_store:read(MsgId), - State1 = #vqstate { q1 = Q1, q4 = Q4a } = - State #vqstate { q3 = Q3a, - q4 = queue:in({msg_and_index, Msg, SeqId, - IsDelivered, true, IndexOnDisk}, - Q4) }, - State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of - true -> - true = queue:is_empty(Q2), %% ASSERTION - State1 #vqstate { q1 = queue:new(), - q4 = queue:join(Q4a, Q1) }; - false -> - State1 - end, + Q4a = queue:in( + #alpha { msg = Msg, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = true, + index_on_disk = IndexOnDisk }, Q4), + %% TODO - if it's not persistent, remove it from disk now + State1 = State #vqstate { q3 = Q3a, q4 = Q4a }, + State2 = + case {queue:is_empty(Q3a), 0 == GammaCount} of + {true, true} -> + %% q3 is now empty, it wasn't before; gamma is + %% still empty. So q2 must be empty, and q1 + %% can now be joined onto q4 + true = queue:is_empty(Q2), %% ASSERTION + State1 #vqstate { q1 = queue:new(), + q4 = queue:join(Q4a, Q1) }; + {true, false} -> + {List, IndexState1} = + rabbit_queue_index:read_segment_entries(GammaSeqId, + IndexState), + State3 = State1 #vqstate { index_state = IndexState1 }, + %% length(List) may be < segment_size because + %% of acks. In fact, List may be [] + Q3b = betas_from_segment_entries(List), + case GammaCount - length(List) of + 0 -> + %% gamma is now empty, but it wasn't + %% before, so can now join q2 onto q3 + State3 #vqstate { + gamma = #gamma { seq_id = undefined, + count = 0 }, + q2 = queue:new(), q3 = queue:join(Q3b, Q2) }; + N when N > 0 -> + State3 #vqstate { + gamma = #gamma { + seq_id = GammaSeqId + + rabbit_queue_index:segment_size(), + count = N }, q3 = Q3b } + end; + {false, _} -> + %% q3 still isn't empty, we've not touched + %% gamma, so the invariants between q1, q2, + %% gamma and q3 are maintained + State1 + end, out(State2) end. +betas_from_segment_entries(List) -> + queue:from_list(lists:map(fun ({MsgId, SeqId, IsPersistent, IsDelivered}) -> + #beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = true } + end, List)). + maybe_start_prefetcher(State) -> %% TODO State. @@ -243,15 +314,10 @@ reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, State; reduce_memory_use(State = #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> - State1 = #vqstate { ram_msg_count = RamMsgCount } = - maybe_push_q1_to_betas(State), - State2 = case TargetRamMsgCount >= RamMsgCount of - true -> State1; - false -> maybe_push_q4_to_betas(State) - end, + State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), case TargetRamMsgCount of - 0 -> push_betas_to_gammas(State); - _ -> State2 + 0 -> push_betas_to_gammas(State1); + _ -> State1 end. maybe_write_msg_to_disk(Bool, Msg = #basic_message { @@ -274,17 +340,32 @@ maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered, IndexState) -> {false, IndexState}. -test_keep_msg_in_ram(#vqstate { target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - q1 = Q1 }) -> +test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, + ram_msg_count = RamMsgCount, + q1 = Q1, q3 = Q3 }) -> case TargetRamMsgCount of - undefined -> msg_and_index; - 0 -> neither; + undefined -> + msg; + 0 -> + case queue:out(Q3) of + {empty, _Q3} -> + %% if TargetRamMsgCount == 0, we know we have no + %% alphas. If q3 is empty then gamma must be empty + %% too, so create a beta, which should end up in + %% q3 + index; + {{value, #beta { seq_id = OldSeqId }}, _Q3a} -> + %% don't look at the current gamma as it may be empty + case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of + true -> neither; + false -> index + end + end; _ when TargetRamMsgCount > RamMsgCount -> - msg_and_index; + msg; _ -> case queue:is_empty(Q1) of - true -> just_index; - false -> msg_and_index %% can push out elders to disk + true -> index; + false -> msg %% can push out elders to disk end end. @@ -293,9 +374,10 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, - gamma = {_GammaSeqId, GammaCount}, - q3 = Q3, q4 = Q4 }) -> +store_alpha_entry(Entry = #alpha {}, State = + #vqstate { q1 = Q1, q2 = Q2, + gamma = #gamma { count = GammaCount }, + q3 = Q3, q4 = Q4 }) -> case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso GammaCount == 0 andalso queue:is_empty(Q3) of true -> @@ -304,95 +386,104 @@ store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) }) end. -store_beta_entry(Entry, State = - #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount}, +store_beta_entry(Entry = #beta {}, State = + #vqstate { q2 = Q2, gamma = #gamma { count = GammaCount }, q3 = Q3 }) -> case queue:is_empty(Q2) andalso GammaCount == 0 of true -> State #vqstate { q3 = queue:in(Entry, Q3) }; false -> State #vqstate { q2 = queue:in(Entry, Q2) } end. -maybe_push_q1_to_betas(State = - #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount - }) when TargetRamMsgCount >= RamMsgCount -> - State; -maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, - q1 = Q1 }) -> - case queue:out(Q1) of - {empty, _Q1} -> State; - {{value, {msg_and_index, Msg = #basic_message { - guid = MsgId, is_persistent = IsPersistent }, - SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q1a} -> - true = case MsgOnDisk of - true -> true; - false -> maybe_write_msg_to_disk(true, Msg) - end, - maybe_push_q1_to_betas( - store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered, - IndexOnDisk}, - State #vqstate { ram_msg_count = RamMsgCount - 1, - q1 = Q1a })) - end. +maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> + maybe_push_alphas_to_betas( + fun queue:out/1, + fun (Beta, Q1a, State1) -> + %% these could legally go to q3 if gamma and q2 are empty + store_beta_entry(Beta, State1 #vqstate { q1 = Q1a }) + end, Q1, State). -maybe_push_q4_to_betas(State = - #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount - }) when TargetRamMsgCount >= RamMsgCount -> +maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> + maybe_push_alphas_to_betas( + fun queue:out_r/1, + fun (Beta, Q4a, State1 = #vqstate { q3 = Q3 }) -> + %% these must go to q3 + State1 #vqstate { q3 = queue:in_r(Beta, Q3), q4 = Q4a } + end, Q4, State). + +maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = + #vqstate { ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) + when TargetRamMsgCount >= RamMsgCount -> State; -maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, - q4 = Q4, q3 = Q3 }) -> - case queue:out_r(Q4) of - {empty, _Q4} -> State; - {{value, {msg_and_index, Msg = #basic_message { - guid = MsgId, is_persistent = IsPersistent }, - SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} -> +maybe_push_alphas_to_betas(Generator, Consumer, Q, State = + #vqstate { ram_msg_count = RamMsgCount }) -> + case Generator(Q) of + {empty, _Q} -> State; + {{value, + #alpha { msg = Msg = #basic_message { guid = MsgId, + is_persistent = IsPersistent }, + seq_id = SeqId, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + Qa} -> true = case MsgOnDisk of true -> true; false -> maybe_write_msg_to_disk(true, Msg) end, - Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered, - IndexOnDisk}, Q3), - maybe_push_q4_to_betas( - State #vqstate { ram_msg_count = RamMsgCount - 1, - q3 = Q3a, q4 = Q4a }) + Beta = #beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk }, + State1 = State #vqstate { ram_msg_count = RamMsgCount - 1 }, + maybe_push_alphas_to_betas(Generator, Consumer, Qa, + Consumer(Beta, Qa, State1)) end. push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, index_state = IndexState }) -> %% HighSeqId is high in the sense that it must be higher than the - %% seqid in Gamma, but it's also the lowest of the betas that we + %% seq_id in Gamma, but it's also the lowest of the betas that we %% transfer from q2 to gamma. {HighSeqId, Len1, Q2a, IndexState1} = push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), - Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}), - State1 = State #vqstate { q2 = Q2a, - gamma = Gamma1, + Gamma1 = #gamma { seq_id = Gamma1SeqId } = + combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }), + State1 = State #vqstate { q2 = Q2a, gamma = Gamma1, index_state = IndexState1 }, case queue:out(Q3) of {empty, _Q3} -> State1; - {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, - _IndexOnDisk}}, _Q3a} -> + {{value, #beta { seq_id = SeqId }}, _Q3a} -> Limit = rabbit_queue_index:next_segment_boundary(SeqId), - case Limit == Gamma1SeqId of - true -> %% already only holding the minimum, nothing to do + case Gamma1SeqId of + Limit -> %% already only holding the minimum, nothing to do State1; - false -> - %% ASSERTION - true = Gamma1SeqId == undefined orelse - Gamma1SeqId == Limit + rabbit_queue_index:segment_size(), + _ when Gamma1SeqId == undefined orelse Gamma1SeqId > Limit -> + %% ASSERTION (sadly large!) + %% This says that if Gamma1SeqId != undefined then + %% the gap from Limit to Gamma1SeqId is an integer + %% multiple of segment_size + SegmentCount = + case Gamma1SeqId of + undefined -> undefined; + _ -> (Gamma1SeqId - Limit) / + rabbit_queue_index:segment_size() + end, + true = (is_integer(SegmentCount) andalso SegmentCount > 0) + orelse Gamma1SeqId == undefined, %% LowSeqId is low in the sense that it must be - %% lower than the seqid in Gamma1, in fact either - %% gamma1 has undefined as its seqid or its seqid - %% is LowSeqId + 1. But because we use - %% queue:out_r, LowSeqId is actually also the - %% highest seqid of the betas we transfer from q3 - %% to gammas. + %% lower than the seq_id in gamma1, in fact either + %% gamma1 has undefined as its seq_id or there + %% does not exist a seq_id X s.t. X > LowSeqId and + %% X < gamma1's seq_id (would be +1 if it wasn't + %% for the possibility of gaps in the seq_ids). + %% But because we use queue:out_r, LowSeqId is + %% actually also the highest seqid of the betas we + %% transfer from q3 to gammas. {LowSeqId, Len2, Q3b, IndexState2} = - push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3, + push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), - Gamma1SeqId = LowSeqId + 1, %% ASSERTION - Gamma2 = combine_gammas({Limit, Len2}, Gamma1), + true = Gamma1SeqId > LowSeqId, %% ASSERTION + Gamma2 = combine_gammas( + #gamma { seq_id = Limit, count = Len2}, Gamma1), State1 #vqstate { q3 = Q3b, gamma = Gamma2, index_state = IndexState2 } end @@ -401,8 +492,7 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, push_betas_to_gammas(Generator, Limit, Q, IndexState) -> case Generator(Q) of {empty, Qa} -> {undefined, 0, Qa, IndexState}; - {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, - _IndexOnDisk}}, _Qa} -> + {{value, #beta { seq_id = SeqId }}, _Qa} -> {Count, Qb, IndexState1} = push_betas_to_gammas(Generator, Limit, Q, 0, IndexState), {SeqId, Count, Qb, IndexState1} @@ -411,11 +501,13 @@ push_betas_to_gammas(Generator, Limit, Q, IndexState) -> push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> case Generator(Q) of {empty, Qa} -> {Count, Qa, IndexState}; - {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered, - _IndexOnDisk}}, _Qa} -> + {{value, #beta { seq_id = SeqId }}, _Qa} + when Limit /= undefined andalso SeqId < Limit -> {Count, Q, IndexState}; - {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, - IndexOnDisk}}, Qa} -> + {{value, #beta { msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + index_on_disk = IndexOnDisk}}, Qa} -> IndexState1 = case IndexOnDisk of true -> IndexState; @@ -428,10 +520,15 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> end, push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1) end. - -combine_gammas({_, 0}, {_, 0}) -> {undefined, 0}; -combine_gammas({_, 0}, B ) -> B; -combine_gammas(A , {_, 0}) -> A; -combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) -> - SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION - {SeqIdLow, CountLow + CountHigh}. + +%% the first arg is the older gamma +combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) -> {undefined, 0}; +combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B; +combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A; +combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow }, + #gamma { seq_id = SeqIdHigh, count = CountHigh}) -> + true = SeqIdLow + CountLow =< SeqIdHigh, %% ASSERTION + %% note the above assertion does not say ==. This is because acks + %% may mean that the counts are not straight multiples of + %% segment_size. + #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}. |
