diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-01-12 18:33:37 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-01-12 18:33:37 +0000 |
| commit | 1e4ecb8f788103cb2d52cbf551e16a3fe0d829f2 (patch) | |
| tree | 141f082be8b4ed28003231d422d90bd71c83e900 /src | |
| parent | 872a13632237b4c9440320a819afd413e9231685 (diff) | |
| download | rabbitmq-server-git-1e4ecb8f788103cb2d52cbf551e16a3fe0d829f2.tar.gz | |
Right, getting there - some major reworkings to vq which have fixed bugs. It doesn't quite do everything I want it to do it - in particular, on memory reduction, it needs to ensure that the inner queues nearest δ have sufficient non-ram-index msgs, but all thet tests pass and adding that feature shouldn't be too painful.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_tests.erl | 18 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 789 |
2 files changed, 465 insertions, 342 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 856a8c4647..16332f325c 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1225,7 +1225,7 @@ fresh_variable_queue() -> assert_prop(S0, len, 0), assert_prop(S0, q1, 0), assert_prop(S0, q2, 0), - assert_prop(S0, gamma, #gamma { start_seq_id = undefined, + assert_prop(S0, delta, #delta { start_seq_id = undefined, count = 0, end_seq_id = undefined }), assert_prop(S0, q3, 0), @@ -1234,7 +1234,7 @@ fresh_variable_queue() -> test_variable_queue() -> passed = test_variable_queue_dynamic_duration_change(), - passed = test_variable_queue_partial_segments_gamma_thing(), + passed = test_variable_queue_partial_segments_delta_thing(), passed. test_variable_queue_dynamic_duration_change() -> @@ -1253,7 +1253,7 @@ test_variable_queue_dynamic_duration_change() -> %% just publish and fetch some persistent msgs, this hits the the %% partial segment path in queue_index due to the period when - %% duration was 0 and the entire queue was gamma. + %% duration was 0 and the entire queue was delta. {_SeqIds1, VQ7} = variable_queue_publish(true, 20, VQ6), {VQ8, AckTags1} = variable_queue_fetch(20, true, false, 20, VQ7), VQ9 = rabbit_variable_queue:ack(AckTags1, VQ8), @@ -1288,7 +1288,7 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> test_variable_queue_dynamic_duration_change_f(Len, VQ3) end. -test_variable_queue_partial_segments_gamma_thing() -> +test_variable_queue_partial_segments_delta_thing() -> SegmentSize = rabbit_queue_index:segment_size(), HalfSegment = SegmentSize div 2, VQ0 = fresh_variable_queue(), @@ -1296,21 +1296,21 @@ test_variable_queue_partial_segments_gamma_thing() -> variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2), - %% one segment in q3 as betas, and half a segment in gamma + %% one segment in q3 as betas, and half a segment in delta S3 = rabbit_variable_queue:status(VQ3), io:format("~p~n", [S3]), - assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize, + assert_prop(S3, delta, #delta { start_seq_id = SegmentSize, count = HalfSegment, end_seq_id = SegmentSize + HalfSegment }), assert_prop(S3, q3, SegmentSize), assert_prop(S3, len, SegmentSize + HalfSegment), VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3), {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4), - %% should have 1 alpha, but it's in the same segment as the gammas + %% should have 1 alpha, but it's in the same segment as the deltas S5 = rabbit_variable_queue:status(VQ5), io:format("~p~n", [S5]), assert_prop(S5, q1, 1), - assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize, + assert_prop(S5, delta, #delta { start_seq_id = SegmentSize, count = HalfSegment, end_seq_id = SegmentSize + HalfSegment }), assert_prop(S5, q3, SegmentSize), @@ -1320,7 +1320,7 @@ test_variable_queue_partial_segments_gamma_thing() -> %% the half segment should now be in q3 as betas S6 = rabbit_variable_queue:status(VQ6), io:format("~p~n", [S6]), - assert_prop(S6, gamma, #gamma { start_seq_id = undefined, + assert_prop(S6, delta, #delta { start_seq_id = undefined, count = 0, end_seq_id = undefined }), assert_prop(S6, q1, 1), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 7c1ef6875d..6c7fad1212 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -43,14 +43,14 @@ -record(vqstate, { q1, q2, - gamma, + delta, q3, q4, duration_target, target_ram_msg_count, ram_msg_count, ram_msg_count_prev, - queue, + ram_index_count, index_state, next_seq_id, out_counter, @@ -68,32 +68,46 @@ -include("rabbit.hrl"). -include("rabbit_queue.hrl"). +-record(msg_status, + { msg, + msg_id, + seq_id, + is_persistent, + is_delivered, + msg_on_disk, + index_on_disk + }). + +-define(RAM_INDEX_TARGET_RATIO, 32768). + %%---------------------------------------------------------------------------- -%% Basic premise is that msgs move from q1 -> q2 -> gamma -> q3 -> q4 +%% WRONG - UPDATE ME! + +%% Basic premise is that msgs move from q1 -> q2 -> delta -> q3 -> q4 %% but they can only do so in the right form. q1 and q4 only hold %% alphas (msgs in ram), q2 and q3 only hold betas (msg on disk, index -%% in ram), and gamma is just a count of the number of index entries +%% in ram), and delta is just a count of the number of index entries %% on disk at that stage (msg on disk, index on disk). %% %% When a msg arrives, we decide in which form it should be. It is %% then added to the right-most appropriate queue, maintaining %% order. Thus if the msg is to be an alpha, it will be added to q1, -%% unless all of q2, gamma and q3 are empty, in which case it will go -%% to q4. If it is to be a beta, it will be added to q2 unless gamma +%% unless all of q2, delta and q3 are empty, in which case it will go +%% to q4. If it is to be a beta, it will be added to q2 unless delta %% is empty, in which case it will go to q3. %% %% The major invariant is that if the msg is to be a beta, q1 will be -%% empty, and if it is to be a gamma then both q1 and q2 will be empty. +%% empty, and if it is to be a delta then both q1 and q2 will be empty. %% %% When taking msgs out of the queue, if q4 is empty then we read -%% directly from q3, or gamma, if q3 is empty. If q3 and gamma are +%% directly from q3, or delta, if q3 is empty. If q3 and delta are %% empty then we have an invariant that q2 must be empty because q2 -%% can only grow if gamma is non empty. +%% can only grow if delta is non empty. %% %% A further invariant is that if the queue is non empty, either q4 or -%% q3 contains at least one entry. I.e. we never allow gamma to -%% contain all msgs in the queue. Also, if q4 is non empty and gamma +%% q3 contains at least one entry. I.e. we never allow delta to +%% contain all msgs in the queue. Also, if q4 is non empty and delta %% is non empty then q3 must be non empty. %%---------------------------------------------------------------------------- @@ -106,15 +120,15 @@ | 'ack_not_on_disk'). -type(vqstate() :: #vqstate { q1 :: queue(), - q2 :: queue(), - gamma :: gamma(), - q3 :: queue(), + q2 :: {non_neg_integer(), queue()}, + delta :: delta(), + q3 :: {non_neg_integer(), queue()}, q4 :: queue(), duration_target :: non_neg_integer(), target_ram_msg_count :: non_neg_integer(), ram_msg_count :: non_neg_integer(), ram_msg_count_prev :: non_neg_integer(), - queue :: queue_name(), + ram_index_count :: non_neg_integer(), index_state :: any(), next_seq_id :: seq_id(), out_counter :: non_neg_integer(), @@ -161,7 +175,7 @@ -endif. --define(BLANK_GAMMA, #gamma { start_seq_id = undefined, +-define(BLANK_DELTA, #delta { start_seq_id = undefined, count = 0, end_seq_id = undefined }). @@ -170,40 +184,40 @@ %%---------------------------------------------------------------------------- init(QueueName) -> - {GammaCount, IndexState} = + {DeltaCount, IndexState} = rabbit_queue_index:init(QueueName), - {GammaSeqId, NextSeqId, IndexState1} = + {DeltaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), - Gamma = case GammaCount of - 0 -> ?BLANK_GAMMA; - _ -> #gamma { start_seq_id = GammaSeqId, - count = GammaCount, + Delta = case DeltaCount of + 0 -> ?BLANK_DELTA; + _ -> #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, end_seq_id = NextSeqId } end, Now = now(), State = - #vqstate { q1 = queue:new(), q2 = queue:new(), - gamma = Gamma, - q3 = queue:new(), q4 = queue:new(), + #vqstate { q1 = queue:new(), q2 = {0, queue:new()}, + delta = Delta, + q3 = {0, queue:new()}, q4 = queue:new(), duration_target = undefined, target_ram_msg_count = undefined, ram_msg_count = 0, ram_msg_count_prev = 0, - queue = QueueName, + ram_index_count = 0, index_state = IndexState1, next_seq_id = NextSeqId, out_counter = 0, in_counter = 0, egress_rate = {Now, 0}, avg_egress_rate = 0, - ingress_rate = {Now, GammaCount}, + ingress_rate = {Now, DeltaCount}, avg_ingress_rate = 0, rate_timestamp = Now, - len = GammaCount, + len = DeltaCount, on_sync = {[], [], []}, msg_store_read_state = rabbit_msg_store:client_init() }, - maybe_gammas_to_betas(State). + maybe_deltas_to_betas(State). terminate(State = #vqstate { index_state = IndexState, msg_store_read_state = MSCState }) -> @@ -221,11 +235,14 @@ publish_delivered(Msg = #basic_message { guid = MsgId, in_counter = InCount}) -> State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, - case maybe_write_msg_to_disk(false, false, Msg) of + MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + is_delivered = false, msg_on_disk = false, index_on_disk = false }, + MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus), + case MsgStatus1 #msg_status.msg_on_disk of true -> - {true, IndexState1} = - maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, - true, IndexState), + {#msg_status { index_on_disk = true }, IndexState1} = + maybe_write_index_to_disk(false, MsgStatus1, IndexState), {{ack_index_and_store, MsgId, SeqId}, State1 #vqstate { index_state = IndexState1, next_seq_id = SeqId + 1 }}; @@ -289,12 +306,11 @@ fetch(State = index_state = IndexState, len = Len }) -> case queue:out(Q4) of {empty, _Q4} -> - fetch_from_q3_or_gamma(State); - {{value, - #alpha { msg = Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - seq_id = SeqId, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, + fetch_from_q3_or_delta(State); + {{value, #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, is_delivered = IsDelivered, + msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> {IndexState1, IndexOnDisk1} = case IndexOnDisk of @@ -313,20 +329,17 @@ fetch(State = false -> {IndexState, false} end, - _MsgOnDisk1 = IndexOnDisk1 = + AckTag = case IndexOnDisk1 of true -> true = IsPersistent, %% ASSERTION - true = MsgOnDisk; %% ASSERTION + true = MsgOnDisk, %% ASSERTION + {ack_index_and_store, MsgId, SeqId}; false -> ok = case MsgOnDisk andalso not IsPersistent of true -> rabbit_msg_store:remove([MsgId]); false -> ok end, - false + ack_not_on_disk end, - AckTag = case IndexOnDisk1 of - true -> {ack_index_and_store, MsgId, SeqId}; - false -> ack_not_on_disk - end, Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, State #vqstate { q4 = Q4a, out_counter = OutCount + 1, @@ -362,7 +375,7 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Len, State1} = purge1(Q4Count, State #vqstate { index_state = IndexState1, q4 = queue:new() }), - {Len, State1 #vqstate { len = 0 }}. + {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0 }}. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. @@ -373,9 +386,9 @@ delete(State) -> IndexState) of {N, N, IndexState2} -> IndexState2; - {GammaSeqId, NextSeqId, IndexState2} -> + {DeltaSeqId, NextSeqId, IndexState2} -> {_DeleteCount, IndexState3} = - delete1(NextSeqId, 0, GammaSeqId, IndexState2), + delete1(NextSeqId, 0, DeltaSeqId, IndexState2), IndexState3 end, IndexState4 = rabbit_queue_index:terminate_and_erase(IndexState1), @@ -383,27 +396,26 @@ delete(State) -> %% [{Msg, AckTag}] %% We guarantee that after fetch, only persistent msgs are left on -%% disk. This means that in a requeue, we set -%% PersistentMsgsAlreadyOnDisk to true, thus avoiding calls to -%% msg_store:write for persistent msgs. It also means that we don't -%% need to worry about calling msg_store:remove (as ack would do) -%% because transient msgs won't be on disk anyway, thus they won't -%% need to be removed. However, we do call msg_store:release so that -%% the cache isn't held full of msgs which are now at the tail of the -%% queue. +%% disk. This means that in a requeue, we set MsgOnDisk to true, thus +%% avoiding calls to msg_store:write for persistent msgs. It also +%% means that we don't need to worry about calling msg_store:remove +%% (as ack would do) because transient msgs won't be on disk anyway, +%% thus they won't need to be removed. However, we do call +%% msg_store:release so that the cache isn't held full of msgs which +%% are now at the tail of the queue. requeue(MsgsWithAckTags, State) -> {SeqIds, MsgIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { guid = MsgId }, AckTag}, {SeqIdsAcc, MsgIdsAcc, StateN}) -> - {_SeqId, StateN1} = publish(Msg, true, true, StateN), - {SeqIdsAcc1, MsgIdsAcc1} = + {SeqIdsAcc1, MsgIdsAcc1, MsgOnDisk} = case AckTag of ack_not_on_disk -> - {SeqIdsAcc, MsgIdsAcc}; + {SeqIdsAcc, MsgIdsAcc, false}; {ack_index_and_store, MsgId, SeqId} -> - {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc]} + {[SeqId | SeqIdsAcc], [MsgId | MsgIdsAcc], true} end, + {_SeqId, StateN1} = publish(Msg, true, MsgOnDisk, StateN), {SeqIdsAcc1, MsgIdsAcc1, StateN1} end, {[], [], State}, MsgsWithAckTags), IndexState1 = case SeqIds of @@ -416,8 +428,13 @@ requeue(MsgsWithAckTags, State) -> end, State1 #vqstate { index_state = IndexState1 }. -tx_publish(Msg = #basic_message { is_persistent = true }, State) -> - true = maybe_write_msg_to_disk(true, false, Msg), +tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, + State) -> + MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = undefined, is_persistent = true, + is_delivered = false, msg_on_disk = false, index_on_disk = false }, + #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(false, MsgStatus), State; tx_publish(_Msg, State) -> State. @@ -457,7 +474,7 @@ tx_commit_from_vq(State = #vqstate { on_sync = {SAcks, SPubs, SFroms} }) -> lists:foldl( fun (Msg = #basic_message { is_persistent = IsPersistent }, {SeqIdsAcc, StateN}) -> - {SeqId, StateN1} = publish(Msg, false, true, StateN), + {SeqId, StateN1} = publish(Msg, false, IsPersistent, StateN), SeqIdsAcc1 = case IsPersistent of true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc @@ -478,22 +495,25 @@ flush_journal(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush_journal(IndexState) }. -status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, +status(#vqstate { q1 = Q1, q2 = {Q2Len, _Q2}, + delta = Delta, q3 = {Q3Len, _Q3}, q4 = Q4, len = Len, on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, avg_egress_rate = AvgEgressRate, avg_ingress_rate = AvgIngressRate, next_seq_id = NextSeqId }) -> [ {q1, queue:len(Q1)}, - {q2, queue:len(Q2)}, - {gamma, Gamma}, - {q3, queue:len(Q3)}, + {q2, Q2Len}, + {delta, Delta}, + {q3, Q3Len}, {q4, queue:len(Q4)}, {len, Len}, {outstanding_txns, length(From)}, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, + {ram_index_count, RamIndexCount}, {avg_egress_rate, AvgEgressRate}, {avg_ingress_rate, AvgIngressRate}, {next_seq_id, NextSeqId} ]. @@ -511,23 +531,45 @@ persistent_msg_ids(Pubs) -> [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, Obj #basic_message.is_persistent]. -entry_salient_details(#alpha { msg = #basic_message { guid = MsgId }, - seq_id = SeqId, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, - index_on_disk = IndexOnDisk }) -> - {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk}; -entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }) -> - {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. - betas_from_segment_entries(List, SeqIdLimit) -> - queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = true } - || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, - SeqId < SeqIdLimit ]). + List1 = [#msg_status { msg = undefined, + msg_id = MsgId, + seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true + } + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, + SeqId < SeqIdLimit ], + {length(List1), queue:from_list([{true, queue:from_list(List1)}])}. + +join_betas({HeadLen, Head}, {TailLen, Tail}) -> + {HeadLen + TailLen, join_betas1(Head, Tail)}. + +join_betas1(Head, Tail) -> + case {queue:out_r(Head), queue:out(Tail)} of + {{empty, _Head}, _} -> + Tail; + {_, {empty, _Tail}} -> + Head; + {{{value, {IndexOnDisk, InnerQHead}}, Head1}, + {{value, {IndexOnDisk, InnerQTail}}, Tail1}} -> + queue:join( + queue:in({IndexOnDisk, + queue:join(InnerQHead, InnerQTail)}, Head1), + Tail1); + {_, _} -> queue:join(Head, Tail) + end. + +grab_beta(Gen, Q) -> + case Gen(Q) of + {empty, _Q} -> + empty; + {{value, {_IndexOnDisk, InnerQ}}, _Q} -> + {{value, MsgStatus}, _InnerQ} = Gen(InnerQ), + MsgStatus + end. read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -541,39 +583,39 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -%% the first arg is the older gamma -combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) -> - ?BLANK_GAMMA; -combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B; -combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A; -combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow}, - #gamma { start_seq_id = SeqIdHigh, count = CountHigh, +%% the first arg is the older delta +combine_deltas(#delta { count = 0 }, #delta { count = 0 }) -> + ?BLANK_DELTA; +combine_deltas(#delta { count = 0 }, #delta { } = B) -> B; +combine_deltas(#delta { } = A, #delta { count = 0 }) -> A; +combine_deltas(#delta { start_seq_id = SeqIdLow, count = CountLow}, + #delta { start_seq_id = SeqIdHigh, count = CountHigh, end_seq_id = SeqIdEnd }) -> true = SeqIdLow =< SeqIdHigh, %% ASSERTION Count = CountLow + CountHigh, true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION - #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }. + #delta { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }. %%---------------------------------------------------------------------------- %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -delete1(NextSeqId, Count, GammaSeqId, IndexState) - when GammaSeqId >= NextSeqId -> +delete1(NextSeqId, Count, DeltaSeqId, IndexState) + when DeltaSeqId >= NextSeqId -> {Count, IndexState}; -delete1(NextSeqId, Count, GammaSeqId, IndexState) -> - Gamma1SeqId = GammaSeqId + rabbit_queue_index:segment_size(), - case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of +delete1(NextSeqId, Count, DeltaSeqId, IndexState) -> + Delta1SeqId = DeltaSeqId + rabbit_queue_index:segment_size(), + case rabbit_queue_index:read_segment_entries(DeltaSeqId, IndexState) of {[], IndexState1} -> - delete1(NextSeqId, Count, Gamma1SeqId, IndexState1); + delete1(NextSeqId, Count, Delta1SeqId, IndexState1); {List, IndexState1} -> - Q = betas_from_segment_entries(List, Gamma1SeqId), + {QCount, Q} = betas_from_segment_entries(List, Delta1SeqId), {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), - delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2) + delete1(NextSeqId, Count + QCount, Delta1SeqId, IndexState2) end. -purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> - case queue:is_empty(Q3) of +purge1(Count, State = #vqstate { q3 = {Q3Len, Q3}, index_state = IndexState }) -> + case 0 == Q3Len of true -> {Q1Count, IndexState1} = remove_queue_entries(State #vqstate.q1, IndexState), @@ -582,30 +624,31 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> false -> {Q3Count, IndexState1} = remove_queue_entries(Q3, IndexState), purge1(Count + Q3Count, - maybe_gammas_to_betas( + maybe_deltas_to_betas( State #vqstate { index_state = IndexState1, - q3 = queue:new() })) + q3 = {0, queue:new()} })) end. remove_queue_entries(Q, IndexState) -> {Count, MsgIds, SeqIds, IndexState1} = lists:foldl( - fun (Entry, {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> - {MsgId, SeqId, IsDelivered, MsgOnDisk, IndexOnDisk} = - entry_salient_details(Entry), + fun (#msg_status { msg_id = MsgId, seq_id = SeqId, + is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + {CountN, MsgIdsAcc, SeqIdsAcc, IndexStateN}) -> + MsgIdsAcc1 = case MsgOnDisk of + true -> [MsgId | MsgIdsAcc]; + false -> MsgIdsAcc + end, + SeqIdsAcc1 = case IndexOnDisk of + true -> [SeqId | SeqIdsAcc]; + false -> SeqIdsAcc + end, IndexStateN1 = case IndexOnDisk andalso not IsDelivered of true -> rabbit_queue_index:write_delivered( SeqId, IndexStateN); false -> IndexStateN end, - SeqIdsAcc1 = case IndexOnDisk of - true -> [SeqId | SeqIdsAcc]; - false -> SeqIdsAcc - end, - MsgIdsAcc1 = case MsgOnDisk of - true -> [MsgId | MsgIdsAcc]; - false -> MsgIdsAcc - end, {CountN + 1, MsgIdsAcc1, SeqIdsAcc1, IndexStateN1} %% we need to write the delivered records in order otherwise %% we upset the qi. So don't reverse. @@ -621,45 +664,56 @@ remove_queue_entries(Q, IndexState) -> end, {Count, IndexState2}. -fetch_from_q3_or_gamma(State = #vqstate { - q1 = Q1, q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, +fetch_from_q3_or_delta(State = #vqstate { + q1 = Q1, q2 = {Q2Len, _Q2}, delta = #delta { count = DeltaCount }, + q3 = {Q3Len, Q3}, q4 = Q4, ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, msg_store_read_state = MSCState }) -> case queue:out(Q3) of {empty, _Q3} -> - 0 = GammaCount, %% ASSERTION - true = queue:is_empty(Q2), %% ASSERTION + 0 = DeltaCount, %% ASSERTION + 0 = Q2Len, %% ASSERTION + 0 = Q3Len, %% ASSERTION true = queue:is_empty(Q1), %% ASSERTION {empty, State}; - {{value, - #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - is_persistent = IsPersistent, index_on_disk = IndexOnDisk }}, - Q3a} -> + {{value, {IndexOnDisk, InnerQ}}, Q3a} -> + {{value, MsgStatus = #msg_status { + msg = undefined, msg_id = MsgId, + is_persistent = IsPersistent + }}, InnerQ1} = queue:out(InnerQ), + Q3LenB = Q3Len - 1, + Q3b = {Q3LenB, case queue:is_empty(InnerQ1) of + true -> Q3a; + false -> queue:in_r({IndexOnDisk, InnerQ1}, Q3a) + end}, {{ok, Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }}, MSCState1} = + guid = MsgId }}, MSCState1} = rabbit_msg_store:read(MsgId, MSCState), - Q4a = queue:in( - #alpha { msg = Msg, seq_id = SeqId, - is_delivered = IsDelivered, msg_on_disk = true, - index_on_disk = IndexOnDisk }, Q4), - State1 = State #vqstate { q3 = Q3a, q4 = Q4a, + Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), + RamIndexCount1 = case IndexOnDisk of + true -> RamIndexCount; + false -> RamIndexCount - 1 + end, + true = RamIndexCount1 >= 0, %% ASSERTION + State1 = State #vqstate { q3 = Q3b, q4 = Q4a, ram_msg_count = RamMsgCount + 1, + ram_index_count = RamIndexCount1, msg_store_read_state = MSCState1 }, State2 = - case {queue:is_empty(Q3a), 0 == GammaCount} of + case {0 == Q3LenB, 0 == DeltaCount} of {true, true} -> - %% q3 is now empty, it wasn't before; gamma is + %% q3 is now empty, it wasn't before; delta is %% still empty. So q2 must be empty, and q1 %% can now be joined onto q4 - true = queue:is_empty(Q2), %% ASSERTION + 0 = Q2Len, %% ASSERTION State1 #vqstate { q1 = queue:new(), q4 = queue:join(Q4a, Q1) }; {true, false} -> - maybe_gammas_to_betas(State1); + maybe_deltas_to_betas(State1); {false, _} -> %% q3 still isn't empty, we've not touched - %% gamma, so the invariants between q1, q2, - %% gamma and q3 are maintained + %% delta, so the invariants between q1, q2, + %% delta and q3 are maintained State1 end, fetch(State2) @@ -673,7 +727,7 @@ reduce_memory_use(State = #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), case TargetRamMsgCount of - 0 -> push_betas_to_gammas(State1); + 0 -> push_betas_to_deltas(State1); _ -> State1 end. @@ -683,7 +737,7 @@ reduce_memory_use(State = test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, - q1 = Q1, q3 = Q3 }) -> + q1 = Q1, q3 = {_Q3Len, Q3} }) -> case TargetRamMsgCount of undefined -> msg; @@ -691,17 +745,19 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, case queue:out(Q3) of {empty, _Q3} -> %% if TargetRamMsgCount == 0, we know we have no - %% alphas. If q3 is empty then gamma must be empty + %% alphas. If q3 is empty then delta must be empty %% too, so create a beta, which should end up in %% q3 index; - {{value, #beta { seq_id = OldSeqId }}, _Q3a} -> - %% Don't look at the current gamma as it may be + {{value, {_IndexOnDisk, InnerQ}}, _Q3a} -> + {{value, #msg_status { seq_id = OldSeqId }}, _InnerQ} = + queue:out(InnerQ), + %% Don't look at the current delta as it may be %% empty. If the SeqId is still within the current %% segment, it'll be a beta, else it'll go into - %% gamma + %% delta case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of - true -> neither; + true -> neither; false -> index end end; @@ -716,178 +772,198 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, end end. -publish(Msg, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { next_seq_id = SeqId, len = Len, - in_counter = InCount }) -> - {SeqId, publish(test_keep_msg_in_ram(SeqId, State), Msg, SeqId, IsDelivered, - PersistentMsgsAlreadyOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, + IsDelivered, MsgOnDisk, State = + #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount }) -> + MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, + index_on_disk = false }, + {SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus, State #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1 })}. -publish(msg, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { index_state = IndexState, - ram_msg_count = RamMsgCount }) -> - MsgOnDisk = - maybe_write_msg_to_disk(false, PersistentMsgsAlreadyOnDisk, Msg), - {IndexOnDisk, IndexState1} = - maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - Entry = #alpha { msg = Msg, seq_id = SeqId, is_delivered = IsDelivered, - msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }, +publish(msg, MsgStatus, State = #vqstate { index_state = IndexState, + ram_msg_count = RamMsgCount }) -> + MsgStatus1 = maybe_write_msg_to_disk(false, MsgStatus), + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(false, MsgStatus1, IndexState), State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, index_state = IndexState1 }, - store_alpha_entry(Entry, State1); - -publish(index, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { index_state = IndexState, q1 = Q1 }) -> - true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg), - {IndexOnDisk, IndexState1} = - maybe_write_index_to_disk(false, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - Entry = #beta { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, - is_persistent = IsPersistent, index_on_disk = IndexOnDisk }, - State1 = State #vqstate { index_state = IndexState1 }, + store_alpha_entry(MsgStatus2, State1); + +publish(index, MsgStatus, State = + #vqstate { index_state = IndexState, q1 = Q1, + ram_index_count = RamIndexCount, + target_ram_msg_count = TargetRamMsgCount }) -> + MsgStatus1 = #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(true, MsgStatus), + ForceIndex = case TargetRamMsgCount of + undefined -> + false; + _ -> + RamIndexCount >= (?RAM_INDEX_TARGET_RATIO * + TargetRamMsgCount) + end, + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of + true -> RamIndexCount; + false -> RamIndexCount + 1 + end, + State1 = State #vqstate { index_state = IndexState1, + ram_index_count = RamIndexCount1 }, true = queue:is_empty(Q1), %% ASSERTION - store_beta_entry(Entry, State1); - -publish(neither, Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - SeqId, IsDelivered, PersistentMsgsAlreadyOnDisk, - State = #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - gamma = Gamma }) -> - true = maybe_write_msg_to_disk(true, PersistentMsgsAlreadyOnDisk, Msg), - {true, IndexState1} = - maybe_write_index_to_disk(true, IsPersistent, MsgId, SeqId, - IsDelivered, IndexState), - true = queue:is_empty(Q1) andalso queue:is_empty(Q2), %% ASSERTION - %% gamma may be empty, seq_id > next_segment_boundary from q3 + store_beta_entry(MsgStatus2, State1); + +publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = + #vqstate { index_state = IndexState, q1 = Q1, q2 = {Q2Len, _Q2}, + delta = Delta }) -> + MsgStatus1 = #msg_status { msg_on_disk = true } = + maybe_write_msg_to_disk(true, MsgStatus), + {#msg_status { index_on_disk = true }, IndexState1} = + maybe_write_index_to_disk(true, MsgStatus1, IndexState), + true = queue:is_empty(Q1) andalso 0 == Q2Len, %% ASSERTION + %% delta may be empty, seq_id > next_segment_boundary from q3 %% head, so we need to find where the segment boundary is before %% or equal to seq_id - GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - + DeltaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - rabbit_queue_index:segment_size(), - Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1, + Delta1 = #delta { start_seq_id = DeltaSeqId, count = 1, end_seq_id = SeqId + 1 }, State #vqstate { index_state = IndexState1, - gamma = combine_gammas(Gamma, Gamma1) }. - -store_alpha_entry(Entry = #alpha {}, State = - #vqstate { q1 = Q1, q2 = Q2, - gamma = #gamma { count = GammaCount }, - q3 = Q3, q4 = Q4 }) -> - case queue:is_empty(Q2) andalso GammaCount == 0 andalso - queue:is_empty(Q3) of - true -> - State #vqstate { q4 = queue:in(Entry, Q4) }; - false -> - maybe_push_q1_to_betas(State #vqstate { q1 = queue:in(Entry, Q1) }) + delta = combine_deltas(Delta, Delta1) }. + +store_alpha_entry(MsgStatus, State = + #vqstate { q1 = Q1, q2 = {Q2Len, _Q2}, + delta = #delta { count = DeltaCount }, + q3 = {Q3Len, _Q3}, q4 = Q4 }) -> + case 0 == Q2Len andalso 0 == DeltaCount andalso 0 == Q3Len of + true -> true = queue:is_empty(Q1), %% ASSERTION + State #vqstate { q4 = queue:in(MsgStatus, Q4) }; + false -> maybe_push_q1_to_betas( + State #vqstate { q1 = queue:in(MsgStatus, Q1) }) end. -store_beta_entry(Entry = #beta {}, State = - #vqstate { q2 = Q2, gamma = #gamma { count = GammaCount }, - q3 = Q3 }) -> - case GammaCount == 0 of - true -> State #vqstate { q3 = queue:in(Entry, Q3) }; - false -> State #vqstate { q2 = queue:in(Entry, Q2) } +store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true }, + State = #vqstate { q2 = {Q2Len, Q2}, + delta = #delta { count = DeltaCount }, + q3 = {Q3Len, Q3} }) -> + MsgStatus1 = MsgStatus #msg_status { msg = undefined }, + case DeltaCount == 0 of + true -> State #vqstate { q3 = {Q3Len + 1, + store_beta_entry1( + fun queue:out_r/1, fun queue:in/2, + MsgStatus1, Q3)} }; + false -> State #vqstate { q2 = {Q2Len + 1, + store_beta_entry1( + fun queue:out_r/1, fun queue:in/2, + MsgStatus1, Q2)} } end. -%% Bool IsPersistent PersistentMsgsAlreadyOnDisk | WriteToDisk? -%% -----------------------------------------------+------------- -%% false false false | false 1 -%% false true false | true 2 -%% false false true | false 3 -%% false true true | false 4 -%% true false false | true 5 -%% true true false | true 6 -%% true false true | true 7 -%% true true true | false 8 - -%% (Bool and not (IsPersistent and PersistentMsgsAlreadyOnDisk)) or | 5 6 7 -%% (IsPersistent and (not PersistentMsgsAlreadyOnDisk)) | 2 6 -maybe_write_msg_to_disk(Bool, PersistentMsgsAlreadyOnDisk, - Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }) - when (Bool andalso not (IsPersistent andalso PersistentMsgsAlreadyOnDisk)) - orelse (IsPersistent andalso not PersistentMsgsAlreadyOnDisk) -> +store_beta_entry1(Gen, Cons, MsgStatus = + #msg_status { index_on_disk = IndexOnDisk }, Q) -> + case Gen(Q) of + {{value, {IndexOnDisk, InnerQ}}, QTail} -> + Cons({IndexOnDisk, Cons(MsgStatus, InnerQ)}, QTail); + {_EmptyOrNotIndexOnDisk, _QTail} -> + Cons({IndexOnDisk, Cons(MsgStatus, queue:new())}, Q) + end. + +maybe_write_msg_to_disk(_Force, MsgStatus = + #msg_status { msg_on_disk = true }) -> + MsgStatus; +maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { + msg = Msg, msg_id = MsgId, + is_persistent = IsPersistent }) + when Force orelse IsPersistent -> ok = rabbit_msg_store:write(MsgId, ensure_binary_properties(Msg)), - true; -maybe_write_msg_to_disk(_Bool, true, #basic_message { is_persistent = true }) -> - true; -maybe_write_msg_to_disk(_Bool, _PersistentMsgsAlreadyOnDisk, _Msg) -> - false. - -maybe_write_index_to_disk(Bool, IsPersistent, MsgId, SeqId, IsDelivered, - IndexState) when Bool orelse IsPersistent -> + MsgStatus #msg_status { msg_on_disk = true }; +maybe_write_msg_to_disk(_Force, MsgStatus) -> + MsgStatus. + +maybe_write_index_to_disk(_Force, MsgStatus = + #msg_status { index_on_disk = true }, IndexState) -> + true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION + {MsgStatus, IndexState}; +maybe_write_index_to_disk(Force, MsgStatus = #msg_status { + msg_id = MsgId, seq_id = SeqId, + is_persistent = IsPersistent, + is_delivered = IsDelivered }, IndexState) + when Force orelse IsPersistent -> + true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:write_published( MsgId, SeqId, IsPersistent, IndexState), - {true, case IsDelivered of - true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); - false -> IndexState1 - end}; -maybe_write_index_to_disk(_Bool, _IsPersistent, _MsgId, _SeqId, _IsDelivered, - IndexState) -> - {false, IndexState}. + {MsgStatus #msg_status { index_on_disk = true }, + case IsDelivered of + true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); + false -> IndexState1 + end}; +maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> + {MsgStatus, IndexState}. %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- -maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) -> +maybe_deltas_to_betas(State = #vqstate { delta = #delta { count = 0 } }) -> State; -maybe_gammas_to_betas(State = - #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, - target_ram_msg_count = TargetRamMsgCount, - gamma = #gamma { start_seq_id = GammaSeqId, - count = GammaCount, - end_seq_id = GammaSeqIdEnd }} - ) -> - case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of +maybe_deltas_to_betas( + State = #vqstate { index_state = IndexState, + q2 = Q2All, q3 = {Q3Len, _Q3} = Q3All, + target_ram_msg_count = TargetRamMsgCount, + delta = #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd }}) -> + case (0 < Q3Len) andalso (0 == TargetRamMsgCount) of true -> State; false -> %% either q3 is empty, in which case we load at least one %% segment, or TargetRamMsgCount > 0, meaning we should %% really be holding all the betas in memory. - {List, IndexState1, Gamma1SeqId} = - read_index_segment(GammaSeqId, IndexState), + {List, IndexState1, Delta1SeqId} = + read_index_segment(DeltaSeqId, IndexState), State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3b = betas_from_segment_entries(List, GammaSeqIdEnd), - Q3a = queue:join(Q3, Q3b), - case GammaCount - queue:len(Q3b) of + Q3bAll = {Q3bLen, _Q3b} = + betas_from_segment_entries(List, DeltaSeqIdEnd), + Q3a = join_betas(Q3All, Q3bAll), + case DeltaCount - Q3bLen of 0 -> - %% gamma is now empty, but it wasn't before, so + %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { gamma = ?BLANK_GAMMA, - q2 = queue:new(), - q3 = queue:join(Q3a, Q2) }; + State1 #vqstate { delta = ?BLANK_DELTA, + q2 = {0, queue:new()}, + q3 = join_betas(Q3a, Q2All) }; N when N > 0 -> State1 #vqstate { q3 = Q3a, - gamma = #gamma { start_seq_id = Gamma1SeqId, + delta = #delta { start_seq_id = Delta1SeqId, count = N, - end_seq_id = GammaSeqIdEnd } } + end_seq_id = DeltaSeqIdEnd } } end end. maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, - fun (Beta, Q1a, State1) -> - %% these could legally go to q3 if gamma and q2 are empty - store_beta_entry(Beta, State1 #vqstate { q1 = Q1a }) + fun (MsgStatus, Q1a, State1) -> + %% these could legally go to q3 if delta and q2 are empty + store_beta_entry(MsgStatus, State1 #vqstate { q1 = Q1a }) end, Q1, State). maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas( fun queue:out_r/1, - fun (Beta, Q4a, State1 = #vqstate { q3 = Q3 }) -> + fun (MsgStatus, Q4a, State1 = #vqstate { q3 = {Q3Len, Q3} }) -> + MsgStatus1 = MsgStatus #msg_status { msg = undefined }, %% these must go to q3 - State1 #vqstate { q3 = queue:in_r(Beta, Q3), q4 = Q4a } + State1 #vqstate { q3 = {Q3Len + 1, + store_beta_entry1( + fun queue:out/1, fun queue:in_r/2, + MsgStatus1, Q3)}, q4 = Q4a } end, Q4, State). maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = @@ -895,112 +971,159 @@ maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, State = target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; -maybe_push_alphas_to_betas(Generator, Consumer, Q, State = - #vqstate { ram_msg_count = RamMsgCount }) -> +maybe_push_alphas_to_betas( + Generator, Consumer, Q, State = + #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, + target_ram_msg_count = TargetRamMsgCount, + index_state = IndexState }) -> case Generator(Q) of {empty, _Q} -> State; - {{value, - #alpha { msg = Msg = #basic_message { guid = MsgId, - is_persistent = IsPersistent }, - seq_id = SeqId, is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }}, - Qa} -> - true = maybe_write_msg_to_disk(true, true, Msg), - Beta = #beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk }, - State1 = State #vqstate { ram_msg_count = RamMsgCount - 1 }, + {{value, MsgStatus}, Qa} -> + MsgStatus1 = maybe_write_msg_to_disk(true, MsgStatus), + ForceIndex = case TargetRamMsgCount of + undefined -> + false; + _ -> + RamIndexCount >= (?RAM_INDEX_TARGET_RATIO * + TargetRamMsgCount) + end, + {MsgStatus2, IndexState1} = + maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), + RamIndexCount1 = case MsgStatus2 #msg_status.index_on_disk of + true -> RamIndexCount; + false -> RamIndexCount + 1 + end, + State1 = State #vqstate { ram_msg_count = RamMsgCount - 1, + ram_index_count = RamIndexCount1, + index_state = IndexState1 }, maybe_push_alphas_to_betas(Generator, Consumer, Qa, - Consumer(Beta, Qa, State1)) + Consumer(MsgStatus2, Qa, State1)) end. -push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, +push_betas_to_deltas(State = #vqstate { q2 = {Q2Len, Q2}, delta = Delta, + q3 = {Q3Len, Q3}, + ram_index_count = RamIndexCount, index_state = IndexState }) -> %% HighSeqId is high in the sense that it must be higher than the - %% seq_id in Gamma, but it's also the lowest of the betas that we - %% transfer from q2 to gamma. - {HighSeqId, Len1, Q2a, IndexState1} = - push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), + %% seq_id in Delta, but it's also the lowest of the betas that we + %% transfer from q2 to delta. + {HighSeqId, Q2Len, Q2a, RamIndexCount1, IndexState1} = + push_betas_to_deltas( + fun queue:out/1, + fun (IndexOnDisk, InnerQ, Q) -> + join_betas1(queue:from_list([{IndexOnDisk, InnerQ}]), Q) + end, undefined, Q2, RamIndexCount, IndexState), + true = queue:is_empty(Q2a), %% ASSERTION EndSeqId = case queue:out_r(Q2) of - {empty, _Q2} -> undefined; - {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1 + {empty, _Q2} -> + undefined; + {{value, {_IndexOnDisk, InnerQ}}, _Q2} -> + {{value, #msg_status { seq_id = EndSeqId1 }}, _InnerQ} = + queue:out_r(InnerQ), + EndSeqId1 + 1 end, - Gamma1 = #gamma { start_seq_id = Gamma1SeqId } = - combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId, - count = Len1, + Delta1 = #delta { start_seq_id = Delta1SeqId } = + combine_deltas(Delta, #delta { start_seq_id = HighSeqId, + count = Q2Len, end_seq_id = EndSeqId }), - State1 = State #vqstate { q2 = Q2a, gamma = Gamma1, - index_state = IndexState1 }, + State1 = State #vqstate { q2 = {0, Q2a}, delta = Delta1, + index_state = IndexState1, + ram_index_count = RamIndexCount1 }, case queue:out(Q3) of - {empty, _Q3} -> State1; - {{value, #beta { seq_id = SeqId }}, _Q3a} -> - {{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3), + {empty, _Q3} -> + State1; + {{value, {_IndexOnDisk1, InnerQ1}}, _Q3} -> + {{value, #msg_status { seq_id = SeqId }}, _InnerQ1} = + queue:out(InnerQ1), + #msg_status { seq_id = SeqIdMax } = + grab_beta(fun queue:out_r/1, Q3), Limit = rabbit_queue_index:next_segment_boundary(SeqId), %% ASSERTION - true = Gamma1SeqId == undefined orelse Gamma1SeqId > SeqIdMax, + true = Delta1SeqId == undefined orelse Delta1SeqId > SeqIdMax, case SeqIdMax < Limit of true -> %% already only holding LTE one segment indices in q3 State1; false -> - %% ASSERTION (sadly large!) - %% This says that if Gamma1SeqId /= undefined then - %% the gap from Limit to Gamma1SeqId is an integer + %% ASSERTION + %% This says that if Delta1SeqId /= undefined then + %% the gap from Limit to Delta1SeqId is an integer %% multiple of segment_size - 0 = case Gamma1SeqId of + 0 = case Delta1SeqId of undefined -> 0; - _ -> (Gamma1SeqId - Limit) rem + _ -> (Delta1SeqId - Limit) rem rabbit_queue_index:segment_size() end, %% SeqIdMax is low in the sense that it must be - %% lower than the seq_id in gamma1, in fact either - %% gamma1 has undefined as its seq_id or there + %% lower than the seq_id in delta1, in fact either + %% delta1 has undefined as its seq_id or there %% does not exist a seq_id X s.t. X > SeqIdMax and - %% X < gamma1's seq_id (would be +1 if it wasn't + %% X < delta1's seq_id (would be +1 if it wasn't %% for the possibility of gaps in the seq_ids). %% But because we use queue:out_r, SeqIdMax is %% actually also the highest seq_id of the betas we - %% transfer from q3 to gammas. - {SeqIdMax, Len2, Q3b, IndexState2} = - push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, - IndexState1), - Gamma2 = combine_gammas(#gamma { start_seq_id = Limit, + %% transfer from q3 to deltas. + {SeqIdMax, Len2, Q3b, RamIndexCount2, IndexState2} = + push_betas_to_deltas( + fun queue:out_r/1, + fun (IndexOnDisk, InnerQ, Q) -> + join_betas1(Q, queue:from_list( + [{IndexOnDisk, InnerQ}])) + end, Limit, Q3, RamIndexCount1, IndexState1), + Delta2 = combine_deltas(#delta { start_seq_id = Limit, count = Len2, end_seq_id = SeqIdMax+1 }, - Gamma1), - State1 #vqstate { q3 = Q3b, gamma = Gamma2, - index_state = IndexState2 } + Delta1), + State1 #vqstate { q3 = {Q3Len - Len2, Q3b}, delta = Delta2, + index_state = IndexState2, + ram_index_count = RamIndexCount2 } end end. -push_betas_to_gammas(Generator, Limit, Q, IndexState) -> +push_betas_to_deltas( + Generator, Consumer, Limit, Q, RamIndexCount, IndexState) -> case Generator(Q) of - {empty, Qa} -> {undefined, 0, Qa, IndexState}; - {{value, #beta { seq_id = SeqId }}, _Qa} -> - {Count, Qb, IndexState1} = - push_betas_to_gammas(Generator, Limit, Q, 0, IndexState), - {SeqId, Count, Qb, IndexState1} + {empty, Qa} -> {undefined, 0, Qa, RamIndexCount, IndexState}; + {{value, {IndexOnDisk, InnerQ}}, Qa} -> + {{value, #msg_status { seq_id = SeqId }}, _Qb} = Generator(InnerQ), + {Count, Qb, RamIndexCount1, IndexState1} = + push_betas_to_deltas( + Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, 0, + RamIndexCount, IndexState), + {SeqId, Count, Qb, RamIndexCount1, IndexState1} end. -push_betas_to_gammas(Generator, Limit, Q, Count, IndexState) -> +push_betas_to_deltas( + Generator, Consumer, Limit, Q, Count, RamIndexCount, IndexState) -> case Generator(Q) of - {empty, Qa} -> {Count, Qa, IndexState}; - {{value, #beta { seq_id = SeqId }}, _Qa} + {empty, Qa} -> + {Count, Qa, RamIndexCount, IndexState}; + {{value, {IndexOnDisk, InnerQ}}, Qa} -> + push_betas_to_deltas( + Generator, Consumer, Limit, IndexOnDisk, InnerQ, Qa, Count, + RamIndexCount, IndexState) + end. + +push_betas_to_deltas(Generator, Consumer, Limit, IndexOnDisk, InnerQ, Q, + Count, RamIndexCount, IndexState) -> + case Generator(InnerQ) of + {empty, _InnerQ} -> + push_betas_to_deltas(Generator, Consumer, Limit, Q, Count, + RamIndexCount, IndexState); + {{value, #msg_status { seq_id = SeqId }}, _InnerQ} when Limit /= undefined andalso SeqId < Limit -> - {Count, Q, IndexState}; - {{value, #beta { msg_id = MsgId, seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - index_on_disk = IndexOnDisk}}, Qa} -> - IndexState1 = + {Count, Consumer(IndexOnDisk, InnerQ, Q), RamIndexCount, + IndexState}; + {{value, MsgStatus}, InnerQa} -> + {RamIndexCount1, IndexState1} = case IndexOnDisk of - true -> IndexState; + true -> {RamIndexCount, IndexState}; false -> - {true, IndexState2} = - maybe_write_index_to_disk( - true, IsPersistent, MsgId, - SeqId, IsDelivered, IndexState), - IndexState2 + {#msg_status { index_on_disk = true }, IndexState2} = + maybe_write_index_to_disk(true, MsgStatus, + IndexState), + {RamIndexCount - 1, IndexState2} end, - push_betas_to_gammas(Generator, Limit, Qa, Count + 1, IndexState1) + push_betas_to_deltas( + Generator, Consumer, Limit, IndexOnDisk, InnerQa, Q, Count + 1, + RamIndexCount1, IndexState1) end. |
