summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_tests.erl75
1 files changed, 42 insertions, 33 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index ce7fe45191..3b47e6988f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2413,18 +2413,19 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
fun (_N) -> <<>> end, VQ).
variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
- lists:foldl(
- fun (N, VQN) ->
- rabbit_variable_queue:publish(
- rabbit_basic:message(
- rabbit_misc:r(<<>>, exchange, <<>>),
- <<>>, #'P_basic'{delivery_mode = case IsPersistent of
- true -> 2;
- false -> 1
- end},
- PayloadFun(N)),
- PropFun(N, #message_properties{}), false, self(), VQN)
- end, VQ, lists:seq(Start, Start + Count - 1)).
+ variable_queue_wait_for_shuffling_end(
+ lists:foldl(
+ fun (N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = case IsPersistent of
+ true -> 2;
+ false -> 1
+ end},
+ PayloadFun(N)),
+ PropFun(N, #message_properties{}), false, self(), VQN)
+ end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -2436,6 +2437,10 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
{VQM, [AckTagN | AckTagsAcc]}
end, {VQ, []}, lists:seq(1, Count)).
+variable_queue_set_ram_duration_target(Duration, VQ) ->
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:set_ram_duration_target(Duration, VQ)).
+
assert_prop(List, Prop, Value) ->
Value = proplists:get_value(Prop, List).
@@ -2550,10 +2555,10 @@ requeue_one_by_one(Acks, VQ) ->
%% Create a vq with messages in q1, delta, and q3, and holes (in the
%% form of pending acks) in the latter two.
variable_queue_with_holes(VQ0) ->
- Interval = 64,
+ Interval = 2048, %% should match vq:IO_BATCH_SIZE
Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ1 = variable_queue_set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
false, 1, Count,
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
@@ -2567,12 +2572,12 @@ variable_queue_with_holes(VQ0) ->
{_MsgIds, VQ4} = rabbit_variable_queue:requeue(
Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
VQ5 = requeue_one_by_one(Subset1, VQ4),
- %% by now we have some messages (and holes) in delt
+ %% by now we have some messages (and holes) in delta
VQ6 = requeue_one_by_one(Subset2, VQ5),
- VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6),
%% add the q1 tail
VQ8 = variable_queue_publish(
- true, Count + 1, 64,
+ true, Count + 1, Interval,
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
%% assertions
[false = case V of
@@ -2581,11 +2586,11 @@ variable_queue_with_holes(VQ0) ->
_ -> false
end || {K, V} <- rabbit_variable_queue:status(VQ8),
lists:member(K, [q1, delta, q3])],
- Depth = Count + 64,
+ Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
- {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}.
+ {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
test_variable_queue_requeue(VQ0) ->
{_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
@@ -2608,11 +2613,11 @@ test_variable_queue_requeue(VQ0) ->
%% requeue from ram_pending_ack into q3, move to delta and then empty queue
test_variable_queue_requeue_ram_beta(VQ0) ->
Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2,
- VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0),
+ VQ1 = variable_queue_publish(false, Count, VQ0),
{VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1),
{Back, Front} = lists:split(Count div 2, AcksR),
{_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(0, VQ3),
{_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4),
VQ6 = requeue_one_by_one(Front, VQ5),
{VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6),
@@ -2655,7 +2660,7 @@ test_variable_queue_ack_limiting(VQ0) ->
%% ensure all acks go to disk on 0 duration target
VQ6 = check_variable_queue_status(
- rabbit_variable_queue:set_ram_duration_target(0, VQ5),
+ variable_queue_set_ram_duration_target(0, VQ5),
[{len, Len div 2},
{target_ram_count, 0},
{ram_msg_count, 0},
@@ -2738,9 +2743,9 @@ test_fetchwhile_varying_ram_duration(VQ0) ->
test_dropfetchwhile_varying_ram_duration(Fun, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
- VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
+ VQ2 = variable_queue_set_ram_duration_target(0, VQ1),
VQ3 = Fun(VQ2),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
VQ6 = Fun(VQ5),
VQ6.
@@ -2761,7 +2766,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
{_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
io:format("~p:~n~p~n",
[Duration1, rabbit_variable_queue:status(VQ5)]),
- VQ6 = rabbit_variable_queue:set_ram_duration_target(
+ VQ6 = variable_queue_set_ram_duration_target(
Duration1, VQ5),
publish_fetch_and_ack(Churn, Len, VQ6)
end, VQ3, [Duration / 4, 0, Duration / 4, infinity]),
@@ -2789,12 +2794,12 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
{_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
VQ3 = check_variable_queue_status(
- rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ variable_queue_set_ram_duration_target(0, VQ2),
%% one segment in q3, and half a segment in delta
[{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
{q3, SegmentSize},
{len, SegmentSize + HalfSegment}]),
- VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3),
VQ5 = check_variable_queue_status(
variable_queue_publish(true, 1, VQ4),
%% one alpha, but it's in the same segment as the deltas
@@ -2826,17 +2831,21 @@ check_variable_queue_status(VQ0, Props) ->
VQ1.
variable_queue_wait_for_shuffling_end(VQ) ->
- case rabbit_variable_queue:needs_timeout(VQ) of
+ case credit_flow:blocked() of
false -> VQ;
- _ -> variable_queue_wait_for_shuffling_end(
- rabbit_variable_queue:timeout(VQ))
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg),
+ variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:resume(VQ))
+ end
end.
test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
VQ1 = variable_queue_publish(true, Count, VQ0),
VQ2 = variable_queue_publish(false, Count, VQ1),
- VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ VQ3 = variable_queue_set_ram_duration_target(0, VQ2),
{VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
Count + Count, VQ3),
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
@@ -2846,13 +2855,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7),
Count1 = rabbit_variable_queue:len(VQ8),
VQ9 = variable_queue_publish(false, 1, VQ8),
- VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
+ VQ10 = variable_queue_set_ram_duration_target(0, VQ9),
{VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
{VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11),
VQ12.
test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
- VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ1 = variable_queue_set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(false, 4, VQ1),
{VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
{_Guids, VQ4} =