summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2009-11-27 13:44:47 +0000
committerMatthew Sackman <matthew@lshift.net>2009-11-27 13:44:47 +0000
commit86f859cb22f7181826779903120178883bbdf85e (patch)
tree8a39940874941f0b8a27c1e476008e3ac48ff8c8
parent0d4dafc9d4f28eccc03bbdb4927697510fd294b4 (diff)
downloadrabbitmq-server-git-86f859cb22f7181826779903120178883bbdf85e.tar.gz
There was a bug. Now it has gone away. It arose when γ has a partial segment, and then memory is made available, and the next msg is persistent. It will go into the partial segment in qi, but will also be in q1 in vq. This lead to the msg being duplicated. Solution is to track the max seq id beyond the end of the γs, and thus drop anything being returned in the segment from qi with a seq_id above this max seq id.
-rw-r--r--include/rabbit_queue.hrl10
-rw-r--r--src/rabbit_tests.erl53
-rw-r--r--src/rabbit_variable_queue.erl92
3 files changed, 116 insertions, 39 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl
index 165a7e7b99..69ad7588c3 100644
--- a/include/rabbit_queue.hrl
+++ b/include/rabbit_queue.hrl
@@ -46,13 +46,15 @@
}).
-record(gamma,
- { seq_id,
- count
+ { start_seq_id,
+ count,
+ end_seq_id %% note the end_seq_id is always >, not >=
}).
-ifdef(use_specs).
--type(gamma() :: #gamma { seq_id :: non_neg_integer(),
- count :: non_neg_integer () }).
+-type(gamma() :: #gamma { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer (),
+ end_seq_id :: non_neg_integer() }).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c931e0b051..f84ba70adc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_queue.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -1201,13 +1202,16 @@ fresh_variable_queue() ->
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
- assert_prop(S0, gamma, {gamma, undefined, 0}),
+ assert_prop(S0, gamma, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }),
assert_prop(S0, q3, 0),
assert_prop(S0, q4, 0),
VQ.
test_variable_queue() ->
passed = test_variable_queue_dynamic_duration_change(),
+ passed = test_variable_queue_partial_segments_gamma_thing(),
passed.
test_variable_queue_dynamic_duration_change() ->
@@ -1260,3 +1264,50 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
after 0 ->
test_variable_queue_dynamic_duration_change_f(Len, VQ3)
end.
+
+test_variable_queue_partial_segments_gamma_thing() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ HalfSegment = SegmentSize div 2,
+ VQ0 = fresh_variable_queue(),
+ {_SeqIds, VQ1} =
+ variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
+ VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
+ VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2),
+ %% one segment in q3 as betas, and half a segment in gamma
+ S3 = rabbit_variable_queue:status(VQ3),
+ io:format("~p~n", [S3]),
+ assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize,
+ count = HalfSegment,
+ end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S3, q3, SegmentSize),
+ assert_prop(S3, len, SegmentSize + HalfSegment),
+ VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3),
+ {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4),
+ %% should have 1 alpha, but it's in the same segment as the gammas
+ S5 = rabbit_variable_queue:status(VQ5),
+ io:format("~p~n", [S5]),
+ assert_prop(S5, q1, 1),
+ assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize,
+ count = HalfSegment,
+ end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S5, q3, SegmentSize),
+ assert_prop(S5, len, SegmentSize + HalfSegment + 1),
+ {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
+ SegmentSize + HalfSegment + 1, VQ5),
+ %% the half segment should now be in q3 as betas
+ S6 = rabbit_variable_queue:status(VQ6),
+ io:format("~p~n", [S6]),
+ assert_prop(S6, gamma, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }),
+ assert_prop(S6, q1, 1),
+ assert_prop(S6, q3, HalfSegment),
+ assert_prop(S6, len, HalfSegment + 1),
+ {VQ7, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
+ HalfSegment + 1, VQ6),
+ VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7),
+ %% should be empty now
+ {empty, VQ9} = rabbit_variable_queue:fetch(VQ8),
+ rabbit_variable_queue:terminate(VQ9),
+
+ passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3958216e45..6fc89cb4cf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -134,7 +134,7 @@
-spec(publish_delivered/2 :: (basic_message(), vqstate()) ->
{ack(), vqstate()}).
-spec(set_queue_ram_duration_target/2 ::
- (('undefined' | number()), vqstate()) -> vqstate()).
+ (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()).
-spec(remeasure_rates/1 :: (vqstate()) -> vqstate()).
-spec(ram_duration/1 :: (vqstate()) -> number()).
-spec(fetch/1 :: (vqstate()) ->
@@ -159,6 +159,10 @@
-endif.
+-define(BLANK_GAMMA, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -169,8 +173,10 @@ init(QueueName) ->
{GammaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
Gamma = case GammaCount of
- 0 -> #gamma { seq_id = undefined, count = 0 };
- _ -> #gamma { seq_id = GammaSeqId, count = GammaCount }
+ 0 -> ?BLANK_GAMMA;
+ _ -> #gamma { start_seq_id = GammaSeqId,
+ count = GammaCount,
+ end_seq_id = NextSeqId }
end,
Now = now(),
State =
@@ -472,7 +478,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate }) ->
+ avg_ingress_rate = AvgIngressRate,
+ next_seq_id = NextSeqId }) ->
[ {q1, queue:len(Q1)},
{q2, queue:len(Q2)},
{gamma, Gamma},
@@ -483,7 +490,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
{avg_egress_rate, AvgEgressRate},
- {avg_ingress_rate, AvgIngressRate} ].
+ {avg_ingress_rate, AvgIngressRate},
+ {next_seq_id, NextSeqId} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -508,12 +516,13 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
index_on_disk = IndexOnDisk }) ->
{MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
-betas_from_segment_entries(List) ->
+betas_from_segment_entries(List, SeqIdLimit) ->
queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
index_on_disk = true }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]).
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
+ SeqId < SeqIdLimit ]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -527,15 +536,18 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-%% the first arg is the older gamma
+%% the first arg is the older gamma
combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) ->
- #gamma { seq_id = undefined, count = 0 };
+ ?BLANK_GAMMA;
combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B;
combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A;
-combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow },
- #gamma { seq_id = SeqIdHigh, count = CountHigh}) ->
+combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow},
+ #gamma { start_seq_id = SeqIdHigh, count = CountHigh,
+ end_seq_id = SeqIdEnd }) ->
true = SeqIdLow =< SeqIdHigh, %% ASSERTION
- #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}.
+ Count = CountLow + CountHigh,
+ true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION
+ #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }.
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
@@ -549,8 +561,8 @@ delete1(NextSeqId, Count, GammaSeqId, IndexState) ->
case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of
{[], IndexState1} ->
delete1(NextSeqId, Count, Gamma1SeqId, IndexState1);
- {List, IndexState1} ->
- Q = betas_from_segment_entries(List),
+ {List, IndexState1} ->
+ Q = betas_from_segment_entries(List, Gamma1SeqId),
{QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2)
end.
@@ -748,7 +760,8 @@ publish(neither, Msg = #basic_message { guid = MsgId,
%% or equal to seq_id
GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
rabbit_queue_index:segment_size(),
- Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 },
+ Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1,
+ end_seq_id = SeqId + 1 },
State #vqstate { index_state = IndexState1,
gamma = combine_gammas(Gamma, Gamma1) }.
@@ -818,8 +831,10 @@ maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) ->
maybe_gammas_to_betas(State =
#vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
target_ram_msg_count = TargetRamMsgCount,
- gamma = #gamma { seq_id = GammaSeqId,
- count = GammaCount }}) ->
+ gamma = #gamma { start_seq_id = GammaSeqId,
+ count = GammaCount,
+ end_seq_id = GammaSeqIdEnd }}
+ ) ->
case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of
true ->
State;
@@ -832,20 +847,22 @@ maybe_gammas_to_betas(State =
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3a = queue:join(Q3, betas_from_segment_entries(List)),
- case GammaCount - length(List) of
+ Q3b = betas_from_segment_entries(List, GammaSeqIdEnd),
+ Q3a = queue:join(Q3, Q3b),
+ case GammaCount - queue:len(Q3b) of
0 ->
%% gamma is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { gamma = #gamma { seq_id = undefined,
- count = 0 },
+ State1 #vqstate { gamma = ?BLANK_GAMMA,
q2 = queue:new(),
q3 = queue:join(Q3a, Q2) };
N when N > 0 ->
maybe_gammas_to_betas(
- State1 #vqstate { q3 = Q3a,
- gamma = #gamma { seq_id = Gamma1SeqId,
- count = N } })
+ State1 #vqstate {
+ q3 = Q3a,
+ gamma = #gamma { start_seq_id = Gamma1SeqId,
+ count = N,
+ end_seq_id = GammaSeqIdEnd } })
end
end.
@@ -897,13 +914,19 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
%% transfer from q2 to gamma.
{HighSeqId, Len1, Q2a, IndexState1} =
push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
- Gamma1 = #gamma { seq_id = Gamma1SeqId } =
- combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }),
+ EndSeqId = case queue:out_r(Q2) of
+ {empty, _Q2} -> undefined;
+ {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1
+ end,
+ Gamma1 = #gamma { start_seq_id = Gamma1SeqId } =
+ combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId,
+ count = Len1,
+ end_seq_id = EndSeqId }),
State1 = State #vqstate { q2 = Q2a, gamma = Gamma1,
index_state = IndexState1 },
case queue:out(Q3) of
{empty, _Q3} -> State1;
- {{value, #beta { seq_id = SeqId }}, _Q3a} ->
+ {{value, #beta { seq_id = SeqId }}, _Q3a} ->
{{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3),
Limit = rabbit_queue_index:next_segment_boundary(SeqId),
%% ASSERTION
@@ -921,21 +944,22 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
_ -> (Gamma1SeqId - Limit) rem
rabbit_queue_index:segment_size()
end,
- %% LowSeqId is low in the sense that it must be
+ %% SeqIdMax is low in the sense that it must be
%% lower than the seq_id in gamma1, in fact either
%% gamma1 has undefined as its seq_id or there
- %% does not exist a seq_id X s.t. X > LowSeqId and
+ %% does not exist a seq_id X s.t. X > SeqIdMax and
%% X < gamma1's seq_id (would be +1 if it wasn't
%% for the possibility of gaps in the seq_ids).
- %% But because we use queue:out_r, LowSeqId is
+ %% But because we use queue:out_r, SeqIdMax is
%% actually also the highest seq_id of the betas we
%% transfer from q3 to gammas.
- {LowSeqId, Len2, Q3b, IndexState2} =
+ {SeqIdMax, Len2, Q3b, IndexState2} =
push_betas_to_gammas(fun queue:out_r/1, Limit, Q3,
IndexState1),
- true = Gamma1SeqId > LowSeqId, %% ASSERTION
- Gamma2 = combine_gammas(
- #gamma { seq_id = Limit, count = Len2}, Gamma1),
+ Gamma2 = combine_gammas(#gamma { start_seq_id = Limit,
+ count = Len2,
+ end_seq_id = SeqIdMax+1 },
+ Gamma1),
State1 #vqstate { q3 = Q3b, gamma = Gamma2,
index_state = IndexState2 }
end