diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-04 12:09:34 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-04 12:09:34 +0000 |
| commit | 3097000acdf5ba1fc507bbdbdb1eeb19a17edc23 (patch) | |
| tree | 0f015395b61172b4f25c1a49335cde584ad229c1 | |
| parent | 7019d0856fabeaf04dbdb6a62ed77af4d5c91251 (diff) | |
| download | rabbitmq-server-git-3097000acdf5ba1fc507bbdbdb1eeb19a17edc23.tar.gz | |
make tests work
- 'hole' generation is sensitive to IO_BATCH_SIZE
- need to handle credit bumps post publishing and prior to status
checks
| -rw-r--r-- | src/rabbit_tests.erl | 75 |
1 files changed, 42 insertions, 33 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index ce7fe45191..3b47e6988f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2413,18 +2413,19 @@ variable_queue_publish(IsPersistent, Count, PropFun, VQ) -> fun (_N) -> <<>> end, VQ). variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> - lists:foldl( - fun (N, VQN) -> - rabbit_variable_queue:publish( - rabbit_basic:message( - rabbit_misc:r(<<>>, exchange, <<>>), - <<>>, #'P_basic'{delivery_mode = case IsPersistent of - true -> 2; - false -> 1 - end}, - PayloadFun(N)), - PropFun(N, #message_properties{}), false, self(), VQN) - end, VQ, lists:seq(Start, Start + Count - 1)). + variable_queue_wait_for_shuffling_end( + lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{delivery_mode = case IsPersistent of + true -> 2; + false -> 1 + end}, + PayloadFun(N)), + PropFun(N, #message_properties{}), false, self(), VQN) + end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> lists:foldl(fun (N, {VQN, AckTagsAcc}) -> @@ -2436,6 +2437,10 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> {VQM, [AckTagN | AckTagsAcc]} end, {VQ, []}, lists:seq(1, Count)). +variable_queue_set_ram_duration_target(Duration, VQ) -> + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:set_ram_duration_target(Duration, VQ)). + assert_prop(List, Prop, Value) -> Value = proplists:get_value(Prop, List). @@ -2550,10 +2555,10 @@ requeue_one_by_one(Acks, VQ) -> %% Create a vq with messages in q1, delta, and q3, and holes (in the %% form of pending acks) in the latter two. variable_queue_with_holes(VQ0) -> - Interval = 64, + Interval = 2048, %% should match vq:IO_BATCH_SIZE Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, Seq = lists:seq(1, Count), - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish( false, 1, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), @@ -2567,12 +2572,12 @@ variable_queue_with_holes(VQ0) -> {_MsgIds, VQ4} = rabbit_variable_queue:requeue( Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), VQ5 = requeue_one_by_one(Subset1, VQ4), - %% by now we have some messages (and holes) in delt + %% by now we have some messages (and holes) in delta VQ6 = requeue_one_by_one(Subset2, VQ5), - VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6), + VQ7 = variable_queue_set_ram_duration_target(infinity, VQ6), %% add the q1 tail VQ8 = variable_queue_publish( - true, Count + 1, 64, + true, Count + 1, Interval, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), %% assertions [false = case V of @@ -2581,11 +2586,11 @@ variable_queue_with_holes(VQ0) -> _ -> false end || {K, V} <- rabbit_variable_queue:status(VQ8), lists:member(K, [q1, delta, q3])], - Depth = Count + 64, + Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), - {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + 64), VQ8}. + {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. test_variable_queue_requeue(VQ0) -> {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = @@ -2608,11 +2613,11 @@ test_variable_queue_requeue(VQ0) -> %% requeue from ram_pending_ack into q3, move to delta and then empty queue test_variable_queue_requeue_ram_beta(VQ0) -> Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, - VQ1 = rabbit_tests:variable_queue_publish(false, Count, VQ0), + VQ1 = variable_queue_publish(false, Count, VQ0), {VQ2, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ1), {Back, Front} = lists:split(Count div 2, AcksR), {_, VQ3} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ2), - VQ4 = rabbit_variable_queue:set_ram_duration_target(0, VQ3), + VQ4 = variable_queue_set_ram_duration_target(0, VQ3), {_, VQ5} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ4), VQ6 = requeue_one_by_one(Front, VQ5), {VQ7, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ6), @@ -2655,7 +2660,7 @@ test_variable_queue_ack_limiting(VQ0) -> %% ensure all acks go to disk on 0 duration target VQ6 = check_variable_queue_status( - rabbit_variable_queue:set_ram_duration_target(0, VQ5), + variable_queue_set_ram_duration_target(0, VQ5), [{len, Len div 2}, {target_ram_count, 0}, {ram_msg_count, 0}, @@ -2738,9 +2743,9 @@ test_fetchwhile_varying_ram_duration(VQ0) -> test_dropfetchwhile_varying_ram_duration(Fun, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), - VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), + VQ2 = variable_queue_set_ram_duration_target(0, VQ1), VQ3 = Fun(VQ2), - VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), VQ6 = Fun(VQ5), VQ6. @@ -2761,7 +2766,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4), io:format("~p:~n~p~n", [Duration1, rabbit_variable_queue:status(VQ5)]), - VQ6 = rabbit_variable_queue:set_ram_duration_target( + VQ6 = variable_queue_set_ram_duration_target( Duration1, VQ5), publish_fetch_and_ack(Churn, Len, VQ6) end, VQ3, [Duration / 4, 0, Duration / 4, infinity]), @@ -2789,12 +2794,12 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0), {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1), VQ3 = check_variable_queue_status( - rabbit_variable_queue:set_ram_duration_target(0, VQ2), + variable_queue_set_ram_duration_target(0, VQ2), %% one segment in q3, and half a segment in delta [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}}, {q3, SegmentSize}, {len, SegmentSize + HalfSegment}]), - VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), + VQ4 = variable_queue_set_ram_duration_target(infinity, VQ3), VQ5 = check_variable_queue_status( variable_queue_publish(true, 1, VQ4), %% one alpha, but it's in the same segment as the deltas @@ -2826,17 +2831,21 @@ check_variable_queue_status(VQ0, Props) -> VQ1. variable_queue_wait_for_shuffling_end(VQ) -> - case rabbit_variable_queue:needs_timeout(VQ) of + case credit_flow:blocked() of false -> VQ; - _ -> variable_queue_wait_for_shuffling_end( - rabbit_variable_queue:timeout(VQ)) + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg), + variable_queue_wait_for_shuffling_end( + rabbit_variable_queue:resume(VQ)) + end end. test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), VQ1 = variable_queue_publish(true, Count, VQ0), VQ2 = variable_queue_publish(false, Count, VQ1), - VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2), + VQ3 = variable_queue_set_ram_duration_target(0, VQ2), {VQ4, _AckTags} = variable_queue_fetch(Count, true, false, Count + Count, VQ3), {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, @@ -2846,13 +2855,13 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {{_Msg1, true, _AckTag1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), Count1 = rabbit_variable_queue:len(VQ8), VQ9 = variable_queue_publish(false, 1, VQ8), - VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9), + VQ10 = variable_queue_set_ram_duration_target(0, VQ9), {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10), {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11), VQ12. test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> - VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ1 = variable_queue_set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), {_Guids, VQ4} = |
