diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 176 |
2 files changed, 164 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c59b12dd29..27952af161 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,8 @@ -module(rabbit_queue_index). -export([init/1, write_published/4, write_delivered/2, write_acks/2, - flush_journal/1, read_segment_entries/2, next_segment_boundary/1]). + flush_journal/1, read_segment_entries/2, next_segment_boundary/1, + segment_size/0]). %%---------------------------------------------------------------------------- %% The queue disk index @@ -139,8 +140,10 @@ -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(), - 'on_disk'}], qistate()}). + {( [{'index', msg_id(), seq_id(), boolean(), boolean()}] + | 'not_found'), qistate()}). +-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(segment_size/0 :: () -> non_neg_integer()). -endif. @@ -237,8 +240,9 @@ read_segment_entries(InitSeqId, State = {lists:foldl(fun (RelSeq, Acc) -> {MsgId, IsDelivered, IsPersistent} = dict:fetch(RelSeq, SDict), - [{index_entry, reconstruct_seq_id(SegNum, RelSeq), - MsgId, IsDelivered, IsPersistent, on_disk} | Acc] + [ {index, MsgId, + reconstruct_seq_id(SegNum, RelSeq), + IsPersistent, IsDelivered, true} | Acc] end, [], RelSeqs), State}. @@ -246,6 +250,9 @@ next_segment_boundary(SeqId) -> {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), reconstruct_seq_id(SegNum + 1, 0). +segment_size() -> + ?SEGMENT_ENTRIES_COUNT. + %%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73c3c3395b..f041f4783e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,8 @@ -module(rabbit_variable_queue). --export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]). +-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, + out/1]). -record(vqstate, { q1, @@ -48,7 +49,8 @@ egress_rate, old_egress_rate, avg_egress_rate, - egress_rate_timestamp + egress_rate_timestamp, + prefetcher }). -include("rabbit.hrl"). @@ -56,7 +58,7 @@ init(QueueName) -> {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName), #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = 0, + gamma = {undefined, 0}, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, ram_msg_count = 0, @@ -67,7 +69,8 @@ init(QueueName) -> egress_rate = 0, old_egress_rate = 0, avg_egress_rate = 0, - egress_rate_timestamp = now() + egress_rate_timestamp = now(), + prefetcher = undefined }. in(Msg, IsDelivered, State) -> @@ -98,7 +101,7 @@ in(just_index, Msg = #basic_message { guid = MsgId, {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), - Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk}, + Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}, State1 = State #vqstate { next_seq_id = SeqId + 1, index_state = IndexState1 }, true = queue:is_empty(Q1), %% ASSERTION @@ -108,7 +111,8 @@ in(neither, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, State = #vqstate { index_state = IndexState, next_seq_id = SeqId, - q1 = Q1, q2 = Q2, gamma = Gamma }) -> + q1 = Q1, q2 = Q2, + gamma = {GammaSeqId, GammaCount} }) -> true = maybe_write_msg_to_disk(true, Msg), {true, IndexState1} = maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, @@ -116,7 +120,7 @@ in(neither, Msg = #basic_message { guid = MsgId, true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION State #vqstate { next_seq_id = SeqId + 1, index_state = IndexState1, - gamma = Gamma + 1 }. + gamma = {GammaSeqId, GammaCount + 1} }. set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, @@ -144,6 +148,91 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Now, out_counter = 0 }. +out(State = + #vqstate { q4 = Q4, + out_counter = OutCount, prefetcher = Prefetcher, + index_state = IndexState }) -> + case queue:out(Q4) of + {empty, _Q4} when Prefetcher == undefined -> + out_from_q3(State); + {empty, _Q4} -> + Q4a = + case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of + empty -> Q4; + Q4b -> Q4b + end, + out(State #vqstate { q4 = Q4a, prefetcher = undefined }); + {{value, + {msg_and_index, Msg = #basic_message { guid = MsgId }, + SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} -> + IndexState1 = + case IndexOnDisk andalso not IsDelivered of + true -> + rabbit_queue_index:write_delivered(SeqId, IndexState); + false -> + IndexState + end, + AckTag = case {IndexOnDisk, MsgOnDisk} of + {true, true } -> {ack_index_and_store, MsgId, SeqId}; + {false, true } -> {ack_store, MsgId}; + {false, false} -> not_on_disk + end, + {{Msg, IsDelivered, AckTag}, + State #vqstate { q4 = Q4a, out_counter = OutCount + 1, + index_state = IndexState1 }} + end. + +out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState, + gamma = {GammaSeqId, GammaCount}, q3 = Q3, + q4 = Q4 }) -> + case queue:out(Q3) of + {empty, _Q3} -> + case GammaCount of + 0 -> + undefined = GammaSeqId, %% ASSERTION + true = queue:is_empty(Q2), %% ASSERTION + {empty, State}; + _ -> + {List = [_|_], IndexState1} = + rabbit_queue_index:read_segment_entries(GammaSeqId, + IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + Q3a = queue:from_list(List), + State2 = + case GammaCount - length(List) of + 0 -> + State1 #vqstate { gamma = {undefined, 0}, + q2 = queue:new(), + q3 = queue:join(Q3a, Q2) }; + N when N > 0 -> + State1 #vqstate { gamma = + {rabbit_queue_index:segment_size() + + GammaSeqId, N}, + q3 = Q3a } + end, + out_from_q3(State2) + end; + {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}}, + Q3a} -> + {ok, Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }} = + rabbit_msg_store:read(MsgId), + State1 = #vqstate { q1 = Q1, q4 = Q4a } = + State #vqstate { q3 = Q3a, + q4 = queue:in({msg_and_index, Msg, SeqId, + IsDelivered, true, IndexOnDisk}, + Q4) }, + State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of + true -> + true = queue:is_empty(Q2), %% ASSERTION + State1 #vqstate { q1 = queue:new(), + q4 = queue:join(Q4a, Q1) }; + false -> + State1 + end, + out(State2) + end. + maybe_start_prefetcher(State) -> %% TODO State. @@ -204,18 +293,21 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, +store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, + gamma = {_GammaSeqId, GammaCount}, q3 = Q3, q4 = Q4 }) -> case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso - Gamma == 0 andalso queue:is_empty(Q3) of + GammaCount == 0 andalso queue:is_empty(Q3) of true -> State #vqstate { q4 = queue:in(Entry, Q4) }; false -> maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) }) end. -store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) -> - case queue:is_empty(Q2) andalso Gamma == 0 of +store_beta_entry(Entry, State = + #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount}, + q3 = Q3 }) -> + case queue:is_empty(Q2) andalso GammaCount == 0 of true -> State #vqstate { q3 = queue:in(Entry, Q3) }; false -> State #vqstate { q2 = queue:in(Entry, Q2) } end. @@ -238,7 +330,7 @@ maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, end, maybe_push_q1_to_betas( store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}, + IndexOnDisk}, State #vqstate { ram_msg_count = RamMsgCount - 1, q1 = Q1a })) end. @@ -260,7 +352,7 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, false -> maybe_write_msg_to_disk(true, Msg) end, Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}, Q3), + IndexOnDisk}, Q3), maybe_push_q4_to_betas( State #vqstate { ram_msg_count = RamMsgCount - 1, q3 = Q3a, q4 = Q4a }) @@ -268,32 +360,62 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, index_state = IndexState }) -> - {Len1, Q2a, IndexState1} = + %% HighSeqId is high in the sense that it must be higher than the + %% seqid in Gamma, but it's also the lowest of the betas that we + %% transfer from q2 to gamma. + {HighSeqId, Len1, Q2a, IndexState1} = push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), - State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1, + Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}), + State1 = State #vqstate { q2 = Q2a, + gamma = Gamma1, index_state = IndexState1 }, case queue:out(Q3) of {empty, _Q3} -> State1; {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, - true, _IndexOnDisk}}, _Q3a} -> - Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1, - {Len2, Q3b, IndexState2} = - push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), - State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2, - index_state = IndexState2 } + _IndexOnDisk}}, _Q3a} -> + Limit = rabbit_queue_index:next_segment_boundary(SeqId), + case Limit == Gamma1SeqId of + true -> %% already only holding the minimum, nothing to do + State1; + false -> + %% ASSERTION + true = Gamma1SeqId == undefined orelse + Gamma1SeqId == Limit + rabbit_queue_index:segment_size(), + %% LowSeqId is low in the sense that it must be + %% lower than the seqid in Gamma1, in fact either + %% gamma1 has undefined as its seqid or its seqid + %% is LowSeqId + 1. But because we use + %% queue:out_r, LowSeqId is actually also the + %% highest seqid of the betas we transfer from q3 + %% to gammas. + {LowSeqId, Len2, Q3b, IndexState2} = + push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3, + IndexState1), + Gamma1SeqId = LowSeqId + 1, %% ASSERTION + Gamma2 = combine_gammas({Limit, Len2}, Gamma1), + State1 #vqstate { q3 = Q3b, gamma = Gamma2, + index_state = IndexState2 } + end end. push_betas_to_gammas(Generator, Limit, Q, IndexState) -> - push_betas_to_gammas(Generator, Limit, Q, 0, IndexState). + case Generator(Q) of + {empty, Qa} -> {undefined, 0, Qa, IndexState}; + {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, + _IndexOnDisk}}, _Qa} -> + {Count, Qb, IndexState1} = + push_betas_to_gammas(Generator, Limit, Q, 0, IndexState), + {SeqId, Count, Qb, IndexState1} + end. push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> case Generator(Q) of {empty, Qa} -> {Count, Qa, IndexState}; {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered, - _MsgOnDisk, _IndexOnDisk}}, _Qa} -> + _IndexOnDisk}}, _Qa} -> {Count, Q, IndexState}; {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}}, Qa} -> + IndexOnDisk}}, Qa} -> IndexState1 = case IndexOnDisk of true -> IndexState; @@ -307,3 +429,9 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1) end. +combine_gammas({_, 0}, {_, 0}) -> {undefined, 0}; +combine_gammas({_, 0}, B ) -> B; +combine_gammas(A , {_, 0}) -> A; +combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) -> + SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION + {SeqIdLow, CountLow + CountHigh}. |
