diff options
| -rw-r--r-- | include/rabbit_queue.hrl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 92 |
3 files changed, 116 insertions, 39 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl index 165a7e7b99..69ad7588c3 100644 --- a/include/rabbit_queue.hrl +++ b/include/rabbit_queue.hrl @@ -46,13 +46,15 @@ }). -record(gamma, - { seq_id, - count + { start_seq_id, + count, + end_seq_id %% note the end_seq_id is always >, not >= }). -ifdef(use_specs). --type(gamma() :: #gamma { seq_id :: non_neg_integer(), - count :: non_neg_integer () }). +-type(gamma() :: #gamma { start_seq_id :: non_neg_integer(), + count :: non_neg_integer (), + end_seq_id :: non_neg_integer() }). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c931e0b051..f84ba70adc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_queue.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -1201,13 +1202,16 @@ fresh_variable_queue() -> assert_prop(S0, len, 0), assert_prop(S0, q1, 0), assert_prop(S0, q2, 0), - assert_prop(S0, gamma, {gamma, undefined, 0}), + assert_prop(S0, gamma, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }), assert_prop(S0, q3, 0), assert_prop(S0, q4, 0), VQ. test_variable_queue() -> passed = test_variable_queue_dynamic_duration_change(), + passed = test_variable_queue_partial_segments_gamma_thing(), passed. test_variable_queue_dynamic_duration_change() -> @@ -1260,3 +1264,50 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> after 0 -> test_variable_queue_dynamic_duration_change_f(Len, VQ3) end. + +test_variable_queue_partial_segments_gamma_thing() -> + SegmentSize = rabbit_queue_index:segment_size(), + HalfSegment = SegmentSize div 2, + VQ0 = fresh_variable_queue(), + {_SeqIds, VQ1} = + variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), + VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), + VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2), + %% one segment in q3 as betas, and half a segment in gamma + S3 = rabbit_variable_queue:status(VQ3), + io:format("~p~n", [S3]), + assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize, + count = HalfSegment, + end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S3, q3, SegmentSize), + assert_prop(S3, len, SegmentSize + HalfSegment), + VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3), + {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4), + %% should have 1 alpha, but it's in the same segment as the gammas + S5 = rabbit_variable_queue:status(VQ5), + io:format("~p~n", [S5]), + assert_prop(S5, q1, 1), + assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize, + count = HalfSegment, + end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S5, q3, SegmentSize), + assert_prop(S5, len, SegmentSize + HalfSegment + 1), + {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, + SegmentSize + HalfSegment + 1, VQ5), + %% the half segment should now be in q3 as betas + S6 = rabbit_variable_queue:status(VQ6), + io:format("~p~n", [S6]), + assert_prop(S6, gamma, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }), + assert_prop(S6, q1, 1), + assert_prop(S6, q3, HalfSegment), + assert_prop(S6, len, HalfSegment + 1), + {VQ7, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, + HalfSegment + 1, VQ6), + VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7), + %% should be empty now + {empty, VQ9} = rabbit_variable_queue:fetch(VQ8), + rabbit_variable_queue:terminate(VQ9), + + passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3958216e45..6fc89cb4cf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -134,7 +134,7 @@ -spec(publish_delivered/2 :: (basic_message(), vqstate()) -> {ack(), vqstate()}). -spec(set_queue_ram_duration_target/2 :: - (('undefined' | number()), vqstate()) -> vqstate()). + (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()). -spec(remeasure_rates/1 :: (vqstate()) -> vqstate()). -spec(ram_duration/1 :: (vqstate()) -> number()). -spec(fetch/1 :: (vqstate()) -> @@ -159,6 +159,10 @@ -endif. +-define(BLANK_GAMMA, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -169,8 +173,10 @@ init(QueueName) -> {GammaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), Gamma = case GammaCount of - 0 -> #gamma { seq_id = undefined, count = 0 }; - _ -> #gamma { seq_id = GammaSeqId, count = GammaCount } + 0 -> ?BLANK_GAMMA; + _ -> #gamma { start_seq_id = GammaSeqId, + count = GammaCount, + end_seq_id = NextSeqId } end, Now = now(), State = @@ -472,7 +478,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate }) -> + avg_ingress_rate = AvgIngressRate, + next_seq_id = NextSeqId }) -> [ {q1, queue:len(Q1)}, {q2, queue:len(Q2)}, {gamma, Gamma}, @@ -483,7 +490,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, {avg_egress_rate, AvgEgressRate}, - {avg_ingress_rate, AvgIngressRate} ]. + {avg_ingress_rate, AvgIngressRate}, + {next_seq_id, NextSeqId} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -508,12 +516,13 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, index_on_disk = IndexOnDisk }) -> {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. -betas_from_segment_entries(List) -> +betas_from_segment_entries(List, SeqIdLimit) -> queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, index_on_disk = true } - || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]). + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, + SeqId < SeqIdLimit ]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -527,15 +536,18 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -%% the first arg is the older gamma +%% the first arg is the older gamma combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) -> - #gamma { seq_id = undefined, count = 0 }; + ?BLANK_GAMMA; combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B; combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A; -combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow }, - #gamma { seq_id = SeqIdHigh, count = CountHigh}) -> +combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow}, + #gamma { start_seq_id = SeqIdHigh, count = CountHigh, + end_seq_id = SeqIdEnd }) -> true = SeqIdLow =< SeqIdHigh, %% ASSERTION - #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}. + Count = CountLow + CountHigh, + true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION + #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }. %%---------------------------------------------------------------------------- %% Internal major helpers for Public API @@ -549,8 +561,8 @@ delete1(NextSeqId, Count, GammaSeqId, IndexState) -> case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of {[], IndexState1} -> delete1(NextSeqId, Count, Gamma1SeqId, IndexState1); - {List, IndexState1} -> - Q = betas_from_segment_entries(List), + {List, IndexState1} -> + Q = betas_from_segment_entries(List, Gamma1SeqId), {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2) end. @@ -748,7 +760,8 @@ publish(neither, Msg = #basic_message { guid = MsgId, %% or equal to seq_id GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - rabbit_queue_index:segment_size(), - Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 }, + Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1, + end_seq_id = SeqId + 1 }, State #vqstate { index_state = IndexState1, gamma = combine_gammas(Gamma, Gamma1) }. @@ -818,8 +831,10 @@ maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) -> maybe_gammas_to_betas(State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, target_ram_msg_count = TargetRamMsgCount, - gamma = #gamma { seq_id = GammaSeqId, - count = GammaCount }}) -> + gamma = #gamma { start_seq_id = GammaSeqId, + count = GammaCount, + end_seq_id = GammaSeqIdEnd }} + ) -> case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of true -> State; @@ -832,20 +847,22 @@ maybe_gammas_to_betas(State = State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3a = queue:join(Q3, betas_from_segment_entries(List)), - case GammaCount - length(List) of + Q3b = betas_from_segment_entries(List, GammaSeqIdEnd), + Q3a = queue:join(Q3, Q3b), + case GammaCount - queue:len(Q3b) of 0 -> %% gamma is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { gamma = #gamma { seq_id = undefined, - count = 0 }, + State1 #vqstate { gamma = ?BLANK_GAMMA, q2 = queue:new(), q3 = queue:join(Q3a, Q2) }; N when N > 0 -> maybe_gammas_to_betas( - State1 #vqstate { q3 = Q3a, - gamma = #gamma { seq_id = Gamma1SeqId, - count = N } }) + State1 #vqstate { + q3 = Q3a, + gamma = #gamma { start_seq_id = Gamma1SeqId, + count = N, + end_seq_id = GammaSeqIdEnd } }) end end. @@ -897,13 +914,19 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, %% transfer from q2 to gamma. {HighSeqId, Len1, Q2a, IndexState1} = push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), - Gamma1 = #gamma { seq_id = Gamma1SeqId } = - combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }), + EndSeqId = case queue:out_r(Q2) of + {empty, _Q2} -> undefined; + {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1 + end, + Gamma1 = #gamma { start_seq_id = Gamma1SeqId } = + combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId, + count = Len1, + end_seq_id = EndSeqId }), State1 = State #vqstate { q2 = Q2a, gamma = Gamma1, index_state = IndexState1 }, case queue:out(Q3) of {empty, _Q3} -> State1; - {{value, #beta { seq_id = SeqId }}, _Q3a} -> + {{value, #beta { seq_id = SeqId }}, _Q3a} -> {{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3), Limit = rabbit_queue_index:next_segment_boundary(SeqId), %% ASSERTION @@ -921,21 +944,22 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, _ -> (Gamma1SeqId - Limit) rem rabbit_queue_index:segment_size() end, - %% LowSeqId is low in the sense that it must be + %% SeqIdMax is low in the sense that it must be %% lower than the seq_id in gamma1, in fact either %% gamma1 has undefined as its seq_id or there - %% does not exist a seq_id X s.t. X > LowSeqId and + %% does not exist a seq_id X s.t. X > SeqIdMax and %% X < gamma1's seq_id (would be +1 if it wasn't %% for the possibility of gaps in the seq_ids). - %% But because we use queue:out_r, LowSeqId is + %% But because we use queue:out_r, SeqIdMax is %% actually also the highest seq_id of the betas we %% transfer from q3 to gammas. - {LowSeqId, Len2, Q3b, IndexState2} = + {SeqIdMax, Len2, Q3b, IndexState2} = push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), - true = Gamma1SeqId > LowSeqId, %% ASSERTION - Gamma2 = combine_gammas( - #gamma { seq_id = Limit, count = Len2}, Gamma1), + Gamma2 = combine_gammas(#gamma { start_seq_id = Limit, + count = Len2, + end_seq_id = SeqIdMax+1 }, + Gamma1), State1 #vqstate { q3 = Q3b, gamma = Gamma2, index_state = IndexState2 } end |
