diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-10-07 18:46:12 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-10-07 18:46:12 +0100 |
| commit | 2ecdaa38b99b2878325cbf5597537162f9f8684e (patch) | |
| tree | c3339735f4e84797d061ae533dcd63583ede968b /src | |
| parent | 686c68c85696a2916f65de4127956a806ce4318f (diff) | |
| download | rabbitmq-server-git-2ecdaa38b99b2878325cbf5597537162f9f8684e.tar.gz | |
implemented out. This is getting pretty disgusting, needs some refactoring, marginally more useful variable names, and more API, in particular proper support for the prefetcher. Also, totally untested.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_queue_index.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 176 |
2 files changed, 164 insertions, 29 deletions
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index c59b12dd29..27952af161 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -32,7 +32,8 @@ -module(rabbit_queue_index). -export([init/1, write_published/4, write_delivered/2, write_acks/2, - flush_journal/1, read_segment_entries/2, next_segment_boundary/1]). + flush_journal/1, read_segment_entries/2, next_segment_boundary/1, + segment_size/0]). %%---------------------------------------------------------------------------- %% The queue disk index @@ -139,8 +140,10 @@ -spec(write_acks/2 :: ([seq_id()], qistate()) -> qistate()). -spec(flush_journal/1 :: (qistate()) -> {boolean(), qistate()}). -spec(read_segment_entries/2 :: (seq_id(), qistate()) -> - {[{'index_entry', seq_id(), msg_id(), boolean(), boolean(), - 'on_disk'}], qistate()}). + {( [{'index', msg_id(), seq_id(), boolean(), boolean()}] + | 'not_found'), qistate()}). +-spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). +-spec(segment_size/0 :: () -> non_neg_integer()). -endif. @@ -237,8 +240,9 @@ read_segment_entries(InitSeqId, State = {lists:foldl(fun (RelSeq, Acc) -> {MsgId, IsDelivered, IsPersistent} = dict:fetch(RelSeq, SDict), - [{index_entry, reconstruct_seq_id(SegNum, RelSeq), - MsgId, IsDelivered, IsPersistent, on_disk} | Acc] + [ {index, MsgId, + reconstruct_seq_id(SegNum, RelSeq), + IsPersistent, IsDelivered, true} | Acc] end, [], RelSeqs), State}. @@ -246,6 +250,9 @@ next_segment_boundary(SeqId) -> {SegNum, _RelSeq} = seq_id_to_seg_and_rel_seq_id(SeqId), reconstruct_seq_id(SegNum + 1, 0). +segment_size() -> + ?SEGMENT_ENTRIES_COUNT. + %%---------------------------------------------------------------------------- %% Minor Helpers %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 73c3c3395b..f041f4783e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -31,7 +31,8 @@ -module(rabbit_variable_queue). --export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1]). +-export([init/1, in/3, set_queue_ram_duration_target/2, remeasure_egress_rate/1, + out/1]). -record(vqstate, { q1, @@ -48,7 +49,8 @@ egress_rate, old_egress_rate, avg_egress_rate, - egress_rate_timestamp + egress_rate_timestamp, + prefetcher }). -include("rabbit.hrl"). @@ -56,7 +58,7 @@ init(QueueName) -> {NextSeqId, IndexState} = rabbit_queue_index:init(QueueName), #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = 0, + gamma = {undefined, 0}, q3 = queue:new(), q4 = queue:new(), target_ram_msg_count = undefined, ram_msg_count = 0, @@ -67,7 +69,8 @@ init(QueueName) -> egress_rate = 0, old_egress_rate = 0, avg_egress_rate = 0, - egress_rate_timestamp = now() + egress_rate_timestamp = now(), + prefetcher = undefined }. in(Msg, IsDelivered, State) -> @@ -98,7 +101,7 @@ in(just_index, Msg = #basic_message { guid = MsgId, {IndexOnDisk, IndexState1} = maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, IsDelivered, IndexState), - Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, true, IndexOnDisk}, + Entry = {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}, State1 = State #vqstate { next_seq_id = SeqId + 1, index_state = IndexState1 }, true = queue:is_empty(Q1), %% ASSERTION @@ -108,7 +111,8 @@ in(neither, Msg = #basic_message { guid = MsgId, is_persistent = IsPersistent }, IsDelivered, State = #vqstate { index_state = IndexState, next_seq_id = SeqId, - q1 = Q1, q2 = Q2, gamma = Gamma }) -> + q1 = Q1, q2 = Q2, + gamma = {GammaSeqId, GammaCount} }) -> true = maybe_write_msg_to_disk(true, Msg), {true, IndexState1} = maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, @@ -116,7 +120,7 @@ in(neither, Msg = #basic_message { guid = MsgId, true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION State #vqstate { next_seq_id = SeqId + 1, index_state = IndexState1, - gamma = Gamma + 1 }. + gamma = {GammaSeqId, GammaCount + 1} }. set_queue_ram_duration_target( DurationTarget, State = #vqstate { avg_egress_rate = EgressRate, @@ -144,6 +148,91 @@ remeasure_egress_rate(State = #vqstate { egress_rate = OldEgressRate, egress_rate_timestamp = Now, out_counter = 0 }. +out(State = + #vqstate { q4 = Q4, + out_counter = OutCount, prefetcher = Prefetcher, + index_state = IndexState }) -> + case queue:out(Q4) of + {empty, _Q4} when Prefetcher == undefined -> + out_from_q3(State); + {empty, _Q4} -> + Q4a = + case rabbit_queue_prefetcher:drain_and_stop(Prefetcher) of + empty -> Q4; + Q4b -> Q4b + end, + out(State #vqstate { q4 = Q4a, prefetcher = undefined }); + {{value, + {msg_and_index, Msg = #basic_message { guid = MsgId }, + SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}}, Q4a} -> + IndexState1 = + case IndexOnDisk andalso not IsDelivered of + true -> + rabbit_queue_index:write_delivered(SeqId, IndexState); + false -> + IndexState + end, + AckTag = case {IndexOnDisk, MsgOnDisk} of + {true, true } -> {ack_index_and_store, MsgId, SeqId}; + {false, true } -> {ack_store, MsgId}; + {false, false} -> not_on_disk + end, + {{Msg, IsDelivered, AckTag}, + State #vqstate { q4 = Q4a, out_counter = OutCount + 1, + index_state = IndexState1 }} + end. + +out_from_q3(State = #vqstate { q2 = Q2, index_state = IndexState, + gamma = {GammaSeqId, GammaCount}, q3 = Q3, + q4 = Q4 }) -> + case queue:out(Q3) of + {empty, _Q3} -> + case GammaCount of + 0 -> + undefined = GammaSeqId, %% ASSERTION + true = queue:is_empty(Q2), %% ASSERTION + {empty, State}; + _ -> + {List = [_|_], IndexState1} = + rabbit_queue_index:read_segment_entries(GammaSeqId, + IndexState), + State1 = State #vqstate { index_state = IndexState1 }, + Q3a = queue:from_list(List), + State2 = + case GammaCount - length(List) of + 0 -> + State1 #vqstate { gamma = {undefined, 0}, + q2 = queue:new(), + q3 = queue:join(Q3a, Q2) }; + N when N > 0 -> + State1 #vqstate { gamma = + {rabbit_queue_index:segment_size() + + GammaSeqId, N}, + q3 = Q3a } + end, + out_from_q3(State2) + end; + {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, IndexOnDisk}}, + Q3a} -> + {ok, Msg = #basic_message { is_persistent = IsPersistent, + guid = MsgId }} = + rabbit_msg_store:read(MsgId), + State1 = #vqstate { q1 = Q1, q4 = Q4a } = + State #vqstate { q3 = Q3a, + q4 = queue:in({msg_and_index, Msg, SeqId, + IsDelivered, true, IndexOnDisk}, + Q4) }, + State2 = case queue:is_empty(Q3a) andalso 0 == GammaCount of + true -> + true = queue:is_empty(Q2), %% ASSERTION + State1 #vqstate { q1 = queue:new(), + q4 = queue:join(Q4a, Q1) }; + false -> + State1 + end, + out(State2) + end. + maybe_start_prefetcher(State) -> %% TODO State. @@ -204,18 +293,21 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, +store_alpha_entry(Entry, State = #vqstate { q1 = Q1, q2 = Q2, + gamma = {_GammaSeqId, GammaCount}, q3 = Q3, q4 = Q4 }) -> case queue:is_empty(Q1) andalso queue:is_empty(Q2) andalso - Gamma == 0 andalso queue:is_empty(Q3) of + GammaCount == 0 andalso queue:is_empty(Q3) of true -> State #vqstate { q4 = queue:in(Entry, Q4) }; false -> maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) }) end. -store_beta_entry(Entry, State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3 }) -> - case queue:is_empty(Q2) andalso Gamma == 0 of +store_beta_entry(Entry, State = + #vqstate { q2 = Q2, gamma = {_GammaSeqId, GammaCount}, + q3 = Q3 }) -> + case queue:is_empty(Q2) andalso GammaCount == 0 of true -> State #vqstate { q3 = queue:in(Entry, Q3) }; false -> State #vqstate { q2 = queue:in(Entry, Q2) } end. @@ -238,7 +330,7 @@ maybe_push_q1_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, end, maybe_push_q1_to_betas( store_beta_entry({index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}, + IndexOnDisk}, State #vqstate { ram_msg_count = RamMsgCount - 1, q1 = Q1a })) end. @@ -260,7 +352,7 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, false -> maybe_write_msg_to_disk(true, Msg) end, Q3a = queue:in_r({index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}, Q3), + IndexOnDisk}, Q3), maybe_push_q4_to_betas( State #vqstate { ram_msg_count = RamMsgCount - 1, q3 = Q3a, q4 = Q4a }) @@ -268,32 +360,62 @@ maybe_push_q4_to_betas(State = #vqstate { ram_msg_count = RamMsgCount, push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, index_state = IndexState }) -> - {Len1, Q2a, IndexState1} = + %% HighSeqId is high in the sense that it must be higher than the + %% seqid in Gamma, but it's also the lowest of the betas that we + %% transfer from q2 to gamma. + {HighSeqId, Len1, Q2a, IndexState1} = push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), - State1 = State #vqstate { q2 = Q2a, gamma = Gamma + Len1, + Gamma1 = {Gamma1SeqId, _} = combine_gammas(Gamma, {HighSeqId, Len1}), + State1 = State #vqstate { q2 = Q2a, + gamma = Gamma1, index_state = IndexState1 }, case queue:out(Q3) of {empty, _Q3} -> State1; {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, - true, _IndexOnDisk}}, _Q3a} -> - Limit = rabbit_queue_index:next_segment_boundary(SeqId) - 1, - {Len2, Q3b, IndexState2} = - push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), - State1 #vqstate { q3 = Q3b, gamma = Gamma + Len1 + Len2, - index_state = IndexState2 } + _IndexOnDisk}}, _Q3a} -> + Limit = rabbit_queue_index:next_segment_boundary(SeqId), + case Limit == Gamma1SeqId of + true -> %% already only holding the minimum, nothing to do + State1; + false -> + %% ASSERTION + true = Gamma1SeqId == undefined orelse + Gamma1SeqId == Limit + rabbit_queue_index:segment_size(), + %% LowSeqId is low in the sense that it must be + %% lower than the seqid in Gamma1, in fact either + %% gamma1 has undefined as its seqid or its seqid + %% is LowSeqId + 1. But because we use + %% queue:out_r, LowSeqId is actually also the + %% highest seqid of the betas we transfer from q3 + %% to gammas. + {LowSeqId, Len2, Q3b, IndexState2} = + push_betas_to_gammas(fun queue:out_r/1, Limit - 1, Q3, + IndexState1), + Gamma1SeqId = LowSeqId + 1, %% ASSERTION + Gamma2 = combine_gammas({Limit, Len2}, Gamma1), + State1 #vqstate { q3 = Q3b, gamma = Gamma2, + index_state = IndexState2 } + end end. push_betas_to_gammas(Generator, Limit, Q, IndexState) -> - push_betas_to_gammas(Generator, Limit, Q, 0, IndexState). + case Generator(Q) of + {empty, Qa} -> {undefined, 0, Qa, IndexState}; + {{value, {index, _MsgId, SeqId, _IsPersistent, _IsDelivered, + _IndexOnDisk}}, _Qa} -> + {Count, Qb, IndexState1} = + push_betas_to_gammas(Generator, Limit, Q, 0, IndexState), + {SeqId, Count, Qb, IndexState1} + end. push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> case Generator(Q) of {empty, Qa} -> {Count, Qa, IndexState}; {{value, {index, _MsgId, Limit, _IsPersistent, _IsDelivered, - _MsgOnDisk, _IndexOnDisk}}, _Qa} -> + _IndexOnDisk}}, _Qa} -> {Count, Q, IndexState}; {{value, {index, MsgId, SeqId, IsPersistent, IsDelivered, - true, IndexOnDisk}}, Qa} -> + IndexOnDisk}}, Qa} -> IndexState1 = case IndexOnDisk of true -> IndexState; @@ -307,3 +429,9 @@ push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1) end. +combine_gammas({_, 0}, {_, 0}) -> {undefined, 0}; +combine_gammas({_, 0}, B ) -> B; +combine_gammas(A , {_, 0}) -> A; +combine_gammas({SeqIdLow, CountLow}, {SeqIdHigh, CountHigh}) -> + SeqIdHigh = SeqIdLow + CountLow, %% ASSERTION + {SeqIdLow, CountLow + CountHigh}. |
