summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit_queue.hrl10
-rw-r--r--src/rabbit_tests.erl53
-rw-r--r--src/rabbit_variable_queue.erl92
3 files changed, 116 insertions, 39 deletions
diff --git a/include/rabbit_queue.hrl b/include/rabbit_queue.hrl
index 165a7e7b99..69ad7588c3 100644
--- a/include/rabbit_queue.hrl
+++ b/include/rabbit_queue.hrl
@@ -46,13 +46,15 @@
}).
-record(gamma,
- { seq_id,
- count
+ { start_seq_id,
+ count,
+ end_seq_id %% note the end_seq_id is always >, not >=
}).
-ifdef(use_specs).
--type(gamma() :: #gamma { seq_id :: non_neg_integer(),
- count :: non_neg_integer () }).
+-type(gamma() :: #gamma { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer (),
+ end_seq_id :: non_neg_integer() }).
-endif.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c931e0b051..f84ba70adc 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -41,6 +41,7 @@
-import(lists).
-include("rabbit.hrl").
+-include("rabbit_queue.hrl").
-include_lib("kernel/include/file.hrl").
test_content_prop_roundtrip(Datum, Binary) ->
@@ -1201,13 +1202,16 @@ fresh_variable_queue() ->
assert_prop(S0, len, 0),
assert_prop(S0, q1, 0),
assert_prop(S0, q2, 0),
- assert_prop(S0, gamma, {gamma, undefined, 0}),
+ assert_prop(S0, gamma, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }),
assert_prop(S0, q3, 0),
assert_prop(S0, q4, 0),
VQ.
test_variable_queue() ->
passed = test_variable_queue_dynamic_duration_change(),
+ passed = test_variable_queue_partial_segments_gamma_thing(),
passed.
test_variable_queue_dynamic_duration_change() ->
@@ -1260,3 +1264,50 @@ test_variable_queue_dynamic_duration_change_f(Len, VQ0) ->
after 0 ->
test_variable_queue_dynamic_duration_change_f(Len, VQ3)
end.
+
+test_variable_queue_partial_segments_gamma_thing() ->
+ SegmentSize = rabbit_queue_index:segment_size(),
+ HalfSegment = SegmentSize div 2,
+ VQ0 = fresh_variable_queue(),
+ {_SeqIds, VQ1} =
+ variable_queue_publish(true, SegmentSize + HalfSegment, VQ0),
+ VQ2 = rabbit_variable_queue:remeasure_rates(VQ1),
+ VQ3 = rabbit_variable_queue:set_queue_ram_duration_target(0, VQ2),
+ %% one segment in q3 as betas, and half a segment in gamma
+ S3 = rabbit_variable_queue:status(VQ3),
+ io:format("~p~n", [S3]),
+ assert_prop(S3, gamma, #gamma { start_seq_id = SegmentSize,
+ count = HalfSegment,
+ end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S3, q3, SegmentSize),
+ assert_prop(S3, len, SegmentSize + HalfSegment),
+ VQ4 = rabbit_variable_queue:set_queue_ram_duration_target(infinity, VQ3),
+ {[_SeqId], VQ5} = variable_queue_publish(true, 1, VQ4),
+ %% should have 1 alpha, but it's in the same segment as the gammas
+ S5 = rabbit_variable_queue:status(VQ5),
+ io:format("~p~n", [S5]),
+ assert_prop(S5, q1, 1),
+ assert_prop(S5, gamma, #gamma { start_seq_id = SegmentSize,
+ count = HalfSegment,
+ end_seq_id = SegmentSize + HalfSegment }),
+ assert_prop(S5, q3, SegmentSize),
+ assert_prop(S5, len, SegmentSize + HalfSegment + 1),
+ {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
+ SegmentSize + HalfSegment + 1, VQ5),
+ %% the half segment should now be in q3 as betas
+ S6 = rabbit_variable_queue:status(VQ6),
+ io:format("~p~n", [S6]),
+ assert_prop(S6, gamma, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }),
+ assert_prop(S6, q1, 1),
+ assert_prop(S6, q3, HalfSegment),
+ assert_prop(S6, len, HalfSegment + 1),
+ {VQ7, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
+ HalfSegment + 1, VQ6),
+ VQ8 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ7),
+ %% should be empty now
+ {empty, VQ9} = rabbit_variable_queue:fetch(VQ8),
+ rabbit_variable_queue:terminate(VQ9),
+
+ passed.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3958216e45..6fc89cb4cf 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -134,7 +134,7 @@
-spec(publish_delivered/2 :: (basic_message(), vqstate()) ->
{ack(), vqstate()}).
-spec(set_queue_ram_duration_target/2 ::
- (('undefined' | number()), vqstate()) -> vqstate()).
+ (('undefined' | 'infinity' | number()), vqstate()) -> vqstate()).
-spec(remeasure_rates/1 :: (vqstate()) -> vqstate()).
-spec(ram_duration/1 :: (vqstate()) -> number()).
-spec(fetch/1 :: (vqstate()) ->
@@ -159,6 +159,10 @@
-endif.
+-define(BLANK_GAMMA, #gamma { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }).
+
%%----------------------------------------------------------------------------
%% Public API
%%----------------------------------------------------------------------------
@@ -169,8 +173,10 @@ init(QueueName) ->
{GammaSeqId, NextSeqId, IndexState1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(IndexState),
Gamma = case GammaCount of
- 0 -> #gamma { seq_id = undefined, count = 0 };
- _ -> #gamma { seq_id = GammaSeqId, count = GammaCount }
+ 0 -> ?BLANK_GAMMA;
+ _ -> #gamma { start_seq_id = GammaSeqId,
+ count = GammaCount,
+ end_seq_id = NextSeqId }
end,
Now = now(),
State =
@@ -472,7 +478,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
target_ram_msg_count = TargetRamMsgCount,
ram_msg_count = RamMsgCount,
avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate }) ->
+ avg_ingress_rate = AvgIngressRate,
+ next_seq_id = NextSeqId }) ->
[ {q1, queue:len(Q1)},
{q2, queue:len(Q2)},
{gamma, Gamma},
@@ -483,7 +490,8 @@ status(#vqstate { q1 = Q1, q2 = Q2, gamma = Gamma, q3 = Q3, q4 = Q4,
{target_ram_msg_count, TargetRamMsgCount},
{ram_msg_count, RamMsgCount},
{avg_egress_rate, AvgEgressRate},
- {avg_ingress_rate, AvgIngressRate} ].
+ {avg_ingress_rate, AvgIngressRate},
+ {next_seq_id, NextSeqId} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -508,12 +516,13 @@ entry_salient_details(#beta { msg_id = MsgId, seq_id = SeqId,
index_on_disk = IndexOnDisk }) ->
{MsgId, SeqId, IsDelivered, true, IndexOnDisk}.
-betas_from_segment_entries(List) ->
+betas_from_segment_entries(List, SeqIdLimit) ->
queue:from_list([#beta { msg_id = MsgId, seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
index_on_disk = true }
- || {MsgId, SeqId, IsPersistent, IsDelivered} <- List]).
+ || {MsgId, SeqId, IsPersistent, IsDelivered} <- List,
+ SeqId < SeqIdLimit ]).
read_index_segment(SeqId, IndexState) ->
SeqId1 = SeqId + rabbit_queue_index:segment_size(),
@@ -527,15 +536,18 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
content = rabbit_binary_parser:clear_decoded_content(
rabbit_binary_generator:ensure_content_encoded(Content)) }.
-%% the first arg is the older gamma
+%% the first arg is the older gamma
combine_gammas(#gamma { count = 0 }, #gamma { count = 0 }) ->
- #gamma { seq_id = undefined, count = 0 };
+ ?BLANK_GAMMA;
combine_gammas(#gamma { count = 0 }, #gamma { } = B) -> B;
combine_gammas(#gamma { } = A, #gamma { count = 0 }) -> A;
-combine_gammas(#gamma { seq_id = SeqIdLow, count = CountLow },
- #gamma { seq_id = SeqIdHigh, count = CountHigh}) ->
+combine_gammas(#gamma { start_seq_id = SeqIdLow, count = CountLow},
+ #gamma { start_seq_id = SeqIdHigh, count = CountHigh,
+ end_seq_id = SeqIdEnd }) ->
true = SeqIdLow =< SeqIdHigh, %% ASSERTION
- #gamma { seq_id = SeqIdLow, count = CountLow + CountHigh}.
+ Count = CountLow + CountHigh,
+ true = Count =< SeqIdEnd - SeqIdLow, %% ASSERTION
+ #gamma { start_seq_id = SeqIdLow, count = Count, end_seq_id = SeqIdEnd }.
%%----------------------------------------------------------------------------
%% Internal major helpers for Public API
@@ -549,8 +561,8 @@ delete1(NextSeqId, Count, GammaSeqId, IndexState) ->
case rabbit_queue_index:read_segment_entries(GammaSeqId, IndexState) of
{[], IndexState1} ->
delete1(NextSeqId, Count, Gamma1SeqId, IndexState1);
- {List, IndexState1} ->
- Q = betas_from_segment_entries(List),
+ {List, IndexState1} ->
+ Q = betas_from_segment_entries(List, Gamma1SeqId),
{QCount, IndexState2} = remove_queue_entries(Q, IndexState1),
delete1(NextSeqId, Count + QCount, Gamma1SeqId, IndexState2)
end.
@@ -748,7 +760,8 @@ publish(neither, Msg = #basic_message { guid = MsgId,
%% or equal to seq_id
GammaSeqId = rabbit_queue_index:next_segment_boundary(SeqId) -
rabbit_queue_index:segment_size(),
- Gamma1 = #gamma { seq_id = GammaSeqId, count = 1 },
+ Gamma1 = #gamma { start_seq_id = GammaSeqId, count = 1,
+ end_seq_id = SeqId + 1 },
State #vqstate { index_state = IndexState1,
gamma = combine_gammas(Gamma, Gamma1) }.
@@ -818,8 +831,10 @@ maybe_gammas_to_betas(State = #vqstate { gamma = #gamma { count = 0 } }) ->
maybe_gammas_to_betas(State =
#vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
target_ram_msg_count = TargetRamMsgCount,
- gamma = #gamma { seq_id = GammaSeqId,
- count = GammaCount }}) ->
+ gamma = #gamma { start_seq_id = GammaSeqId,
+ count = GammaCount,
+ end_seq_id = GammaSeqIdEnd }}
+ ) ->
case (not queue:is_empty(Q3)) andalso 0 == TargetRamMsgCount of
true ->
State;
@@ -832,20 +847,22 @@ maybe_gammas_to_betas(State =
State1 = State #vqstate { index_state = IndexState1 },
%% length(List) may be < segment_size because of acks. But
%% it can't be []
- Q3a = queue:join(Q3, betas_from_segment_entries(List)),
- case GammaCount - length(List) of
+ Q3b = betas_from_segment_entries(List, GammaSeqIdEnd),
+ Q3a = queue:join(Q3, Q3b),
+ case GammaCount - queue:len(Q3b) of
0 ->
%% gamma is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { gamma = #gamma { seq_id = undefined,
- count = 0 },
+ State1 #vqstate { gamma = ?BLANK_GAMMA,
q2 = queue:new(),
q3 = queue:join(Q3a, Q2) };
N when N > 0 ->
maybe_gammas_to_betas(
- State1 #vqstate { q3 = Q3a,
- gamma = #gamma { seq_id = Gamma1SeqId,
- count = N } })
+ State1 #vqstate {
+ q3 = Q3a,
+ gamma = #gamma { start_seq_id = Gamma1SeqId,
+ count = N,
+ end_seq_id = GammaSeqIdEnd } })
end
end.
@@ -897,13 +914,19 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
%% transfer from q2 to gamma.
{HighSeqId, Len1, Q2a, IndexState1} =
push_betas_to_gammas(fun queue:out/1, undefined, Q2, IndexState),
- Gamma1 = #gamma { seq_id = Gamma1SeqId } =
- combine_gammas(Gamma, #gamma { seq_id = HighSeqId, count = Len1 }),
+ EndSeqId = case queue:out_r(Q2) of
+ {empty, _Q2} -> undefined;
+ {{value, #beta { seq_id = EndSeqId1 }}, _Q2} -> EndSeqId1 + 1
+ end,
+ Gamma1 = #gamma { start_seq_id = Gamma1SeqId } =
+ combine_gammas(Gamma, #gamma { start_seq_id = HighSeqId,
+ count = Len1,
+ end_seq_id = EndSeqId }),
State1 = State #vqstate { q2 = Q2a, gamma = Gamma1,
index_state = IndexState1 },
case queue:out(Q3) of
{empty, _Q3} -> State1;
- {{value, #beta { seq_id = SeqId }}, _Q3a} ->
+ {{value, #beta { seq_id = SeqId }}, _Q3a} ->
{{value, #beta { seq_id = SeqIdMax }}, _Q3b} = queue:out_r(Q3),
Limit = rabbit_queue_index:next_segment_boundary(SeqId),
%% ASSERTION
@@ -921,21 +944,22 @@ push_betas_to_gammas(State = #vqstate { q2 = Q2, gamma = Gamma, q3 = Q3,
_ -> (Gamma1SeqId - Limit) rem
rabbit_queue_index:segment_size()
end,
- %% LowSeqId is low in the sense that it must be
+ %% SeqIdMax is low in the sense that it must be
%% lower than the seq_id in gamma1, in fact either
%% gamma1 has undefined as its seq_id or there
- %% does not exist a seq_id X s.t. X > LowSeqId and
+ %% does not exist a seq_id X s.t. X > SeqIdMax and
%% X < gamma1's seq_id (would be +1 if it wasn't
%% for the possibility of gaps in the seq_ids).
- %% But because we use queue:out_r, LowSeqId is
+ %% But because we use queue:out_r, SeqIdMax is
%% actually also the highest seq_id of the betas we
%% transfer from q3 to gammas.
- {LowSeqId, Len2, Q3b, IndexState2} =
+ {SeqIdMax, Len2, Q3b, IndexState2} =
push_betas_to_gammas(fun queue:out_r/1, Limit, Q3,
IndexState1),
- true = Gamma1SeqId > LowSeqId, %% ASSERTION
- Gamma2 = combine_gammas(
- #gamma { seq_id = Limit, count = Len2}, Gamma1),
+ Gamma2 = combine_gammas(#gamma { start_seq_id = Limit,
+ count = Len2,
+ end_seq_id = SeqIdMax+1 },
+ Gamma1),
State1 #vqstate { q3 = Q3b, gamma = Gamma2,
index_state = IndexState2 }
end