diff options
| author | Matthew Sackman <matthew@lshift.net> | 2009-11-27 13:44:47 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2009-11-27 13:44:47 +0000 |
| commit | 86f859cb22f7181826779903120178883bbdf85e (patch) | |
| tree | 8a39940874941f0b8a27c1e476008e3ac48ff8c8 | |
| parent | 0d4dafc9d4f28eccc03bbdb4927697510fd294b4 (diff) | |
| download | rabbitmq-server-git-86f859cb22f7181826779903120178883bbdf85e.tar.gz | |
There was a bug. Now it has gone away. It arose when γ has a partial segment, and then memory is made available, and the next msg is persistent. It will go into the partial segment in qi, but will also be in q1 in vq. This lead to the msg being duplicated. Solution is to track the max seq id beyond the end of the γs, and thus drop anything being returned in the segment from qi with a seq_id above this max seq id.
| -rw-r--r-- | include/rabbit_queue.hrl | 10 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 92 |
3 files changed, 116 insertions, 39 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl index 165a7e7b99..69ad7588c3 100644 --- a/include/rabbit_queue.hrl +++ b/include/rabbit_queue.hrl @@ -46,13 +46,15 @@ }). -record(gamma, - { seq_id, - count + { start_seq_id, + count, + end_seq_id %% note the end_seq_id is always >, not >= }). -ifdef(use_specs). --type(gamma() :: #gamma { seq_id :: non_neg_integer(), - count :: non_neg_integer () }). +-type(gamma() :: #gamma { start_seq_id :: non_neg_integer(), + count :: non_neg_integer (), + end_seq_id :: non_neg_integer() }). -endif. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c931e0b051..f84ba70adc 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -41,6 +41,7 @@ -import(lists). -include("rabbit.hrl"). +-include("rabbit_queue.hrl"). -include_lib("kernel/include/file.hrl"). test_content_prop_roundtrip(Datum, Binary) -> @@ -1201,13 +1202,16 @@ fresh_variable_queue() -> assert_prop(S0, len, 0), assert_prop(S0, q1, 0), assert_prop(S0, q2, 0), - assert_prop(S0, gamma, {gamma, undefined, 0}), + assert_prop(S0, gamma, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }), assert_prop(S0, q3, 0), assert_prop(S0, q4, 0), VQ. test_variable_queue() -> passed = test_variable_queue_dynamic_duration_change(), + passed = test_variable_queue_partial_segments_gamma_thing(), passed. test_variable_queue_dynamic_duration_change() -> @@ -1260,3 +1264,50 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) -> after 0 -> test_variable_queue_dynamic_duration_change_f(Len, VQ3) end. + +test_variable_queue_partial_segments_gamma_thing() -> + SegmentSize = rabbit_queue_index:segment_size(), + HalfSegment = SegmentSize div 2, + VQ0 = fresh_variable_queue(), + {_SeqIds, VQ1} = + variable_queue_publish(true, SegmentSize + HalfSegment, VQ0), + VQ2 = rabbit_variable_queue:remeasure_rates(VQ1), + VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2), + %% one segment in q3 as betas, and half a segment in gamma + S3 = rabbit_variable_queue:status(VQ3), + io:format("~p~n", [S3]), + assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize, + count = HalfSegment, + end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S3, q3, SegmentSize), + assert_prop(S3, len, SegmentSize + HalfSegment), + VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3), + {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4), + %% should have 1 alpha, but it's in the same segment as the gammas + S5 = rabbit_variable_queue:status(VQ5), + io:format("~p~n", [S5]), + assert_prop(S5, q1, 1), + assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize, + count = HalfSegment, + end_seq_id = SegmentSize + HalfSegment }), + assert_prop(S5, q3, SegmentSize), + assert_prop(S5, len, SegmentSize + HalfSegment + 1), + {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false, + SegmentSize + HalfSegment + 1, VQ5), + %% the half segment should now be in q3 as betas + S6 = rabbit_variable_queue:status(VQ6), + io:format("~p~n", [S6]), + assert_prop(S6, gamma, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }), + assert_prop(S6, q1, 1), + assert_prop(S6, q3, HalfSegment), + assert_prop(S6, len, HalfSegment + 1), + {VQ7, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, + HalfSegment + 1, VQ6), + VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7), + %% should be empty now + {empty, VQ9} = rabbit_variable_queue:fetch(VQ8), + rabbit_variable_queue:terminate(VQ9), + + passed. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3958216e45..6fc89cb4cf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -134,7 +134,7 @@ -spec(publish_delivered/2 :: (basic_message(), vqstate()) -> {ack(), vqstate()}). -spec(set_queue_ram_duration_target/2 :: - (('undefined' | number()), vqstate()) -> vqstate()). + (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()). -spec(remeasure_rates/1 :: (vqstate()) -> vqstate()). -spec(ram_duration/1 :: (vqstate()) -> number()). -spec(fetch/1 :: (vqstate()) -> @@ -159,6 +159,10 @@ -endif. +-define(BLANK_GAMMA, #gamma { start_seq_id = undefined, + count = 0, + end_seq_id = undefined }). + %%---------------------------------------------------------------------------- %% Public API %%---------------------------------------------------------------------------- @@ -169,8 +173,10 @@ init(QueueName) -> {GammaSeqId, NextSeqId, IndexState1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState), Gamma = case GammaCount of - 0 -> #gamma { seq_id = undefined, count = 0 }; - _ -> #gamma { seq_id = GammaSeqId, count = GammaCount } + 0 -> ?BLANK_GAMMA; + _ -> #gamma { start_seq_id = GammaSeqId, + count = GammaCount, + end_seq_id = NextSeqId } end, Now = now(), State = @@ -472,7 +478,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, target_ram_msg_count = TargetRamMsgCount, ram_msg_count = RamMsgCount, avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate }) -> + avg_ingress_rate = AvgIngressRate, + next_seq_id = NextSeqId }) -> [ {q1, queue:len(Q1)}, {q2, queue:len(Q2)}, {gamma, Gamma}, @@ -483,7 +490,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4, {target_ram_msg_count, TargetRamMsgCount}, {ram_msg_count, RamMsgCount}, {avg_egress_rate, AvgEgressRate}, - {avg_ingress_rate, AvgIngressRate} ]. + {avg_ingress_rate, AvgIngressRate}, + {next_seq_id, NextSeqId} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -508,12 +516,13 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId, index_on_disk = IndexOnDisk }) -> {MsgId, SeqId, IsDelivered, true, IndexOnDisk}. -betas_from_segment_entries(List) -> +betas_from_segment_entries(List, SeqIdLimit) -> queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, index_on_disk = true } - || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]). + || {MsgId, SeqId, IsPersistent, IsDelivered} <- List, + SeqId < SeqIdLimit ]). read_index_segment(SeqId, IndexState) -> SeqId1 = SeqId + rabbit_queue_index:segment_size(), @@ -527,15 +536,18 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> content = rabbit_binary_parser:clear_decoded_content( rabbit_binary_generator:ensure_content_encoded(Content)) }. -%% the first arg is the older gamma +%% the first arg is the older gamma combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) -> - #gamma { seq_id = undefined, count = 0 }; + ?BLANK_GAMMA; combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B; combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A; -combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow }, - #gamma { seq_id = SeqIdHigh, count = CountHigh}) -> +combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow}, + #gamma { start_seq_id = SeqIdHigh, count = CountHigh, + end_seq_id = SeqIdEnd }) -> true = SeqIdLow =< SeqIdHigh, %% ASSERTION - #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}. + Count = CountLow + CountHigh, + true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION + #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }. %%---------------------------------------------------------------------------- %% Internal major helpers for Public API @@ -549,8 +561,8 @@ delete1(NextSeqId, Count, GammaSeqId, IndexState) -> case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of {[], IndexState1} -> delete1(NextSeqId, Count, Gamma1SeqId, IndexState1); - {List, IndexState1} -> - Q = betas_from_segment_entries(List), + {List, IndexState1} -> + Q = betas_from_segment_entries(List, Gamma1SeqId), {QCount, IndexState2} = remove_queue_entries(Q, IndexState1), delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2) end. @@ -748,7 +760,8 @@ publish(neither, Msg = #basic_message { guid = MsgId, %% or equal to seq_id GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) - rabbit_queue_index:segment_size(), - Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 }, + Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1, + end_seq_id = SeqId + 1 }, State #vqstate { index_state = IndexState1, gamma = combine_gammas(Gamma, Gamma1) }. @@ -818,8 +831,10 @@ maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) -> maybe_gammas_to_betas(State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, target_ram_msg_count = TargetRamMsgCount, - gamma = #gamma { seq_id = GammaSeqId, - count = GammaCount }}) -> + gamma = #gamma { start_seq_id = GammaSeqId, + count = GammaCount, + end_seq_id = GammaSeqIdEnd }} + ) -> case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of true -> State; @@ -832,20 +847,22 @@ maybe_gammas_to_betas(State = State1 = State #vqstate { index_state = IndexState1 }, %% length(List) may be < segment_size because of acks. But %% it can't be [] - Q3a = queue:join(Q3, betas_from_segment_entries(List)), - case GammaCount - length(List) of + Q3b = betas_from_segment_entries(List, GammaSeqIdEnd), + Q3a = queue:join(Q3, Q3b), + case GammaCount - queue:len(Q3b) of 0 -> %% gamma is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { gamma = #gamma { seq_id = undefined, - count = 0 }, + State1 #vqstate { gamma = ?BLANK_GAMMA, q2 = queue:new(), q3 = queue:join(Q3a, Q2) }; N when N > 0 -> maybe_gammas_to_betas( - State1 #vqstate { q3 = Q3a, - gamma = #gamma { seq_id = Gamma1SeqId, - count = N } }) + State1 #vqstate { + q3 = Q3a, + gamma = #gamma { start_seq_id = Gamma1SeqId, + count = N, + end_seq_id = GammaSeqIdEnd } }) end end. @@ -897,13 +914,19 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, %% transfer from q2 to gamma. {HighSeqId, Len1, Q2a, IndexState1} = push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState), - Gamma1 = #gamma { seq_id = Gamma1SeqId } = - combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }), + EndSeqId = case queue:out_r(Q2) of + {empty, _Q2} -> undefined; + {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1 + end, + Gamma1 = #gamma { start_seq_id = Gamma1SeqId } = + combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId, + count = Len1, + end_seq_id = EndSeqId }), State1 = State #vqstate { q2 = Q2a, gamma = Gamma1, index_state = IndexState1 }, case queue:out(Q3) of {empty, _Q3} -> State1; - {{value, #beta { seq_id = SeqId }}, _Q3a} -> + {{value, #beta { seq_id = SeqId }}, _Q3a} -> {{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3), Limit = rabbit_queue_index:next_segment_boundary(SeqId), %% ASSERTION @@ -921,21 +944,22 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3, _ -> (Gamma1SeqId - Limit) rem rabbit_queue_index:segment_size() end, - %% LowSeqId is low in the sense that it must be + %% SeqIdMax is low in the sense that it must be %% lower than the seq_id in gamma1, in fact either %% gamma1 has undefined as its seq_id or there - %% does not exist a seq_id X s.t. X > LowSeqId and + %% does not exist a seq_id X s.t. X > SeqIdMax and %% X < gamma1's seq_id (would be +1 if it wasn't %% for the possibility of gaps in the seq_ids). - %% But because we use queue:out_r, LowSeqId is + %% But because we use queue:out_r, SeqIdMax is %% actually also the highest seq_id of the betas we %% transfer from q3 to gammas. - {LowSeqId, Len2, Q3b, IndexState2} = + {SeqIdMax, Len2, Q3b, IndexState2} = push_betas_to_gammas(fun queue:out_r/1, Limit, Q3, IndexState1), - true = Gamma1SeqId > LowSeqId, %% ASSERTION - Gamma2 = combine_gammas( - #gamma { seq_id = Limit, count = Len2}, Gamma1), + Gamma2 = combine_gammas(#gamma { start_seq_id = Limit, + count = Len2, + end_seq_id = SeqIdMax+1 }, + Gamma1), State1 #vqstate { q3 = Q3b, gamma = Gamma2, index_state = IndexState2 } end |
