diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-19 21:38:32 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-19 21:38:32 +0200 |
| commit | 8cc4229802161d0ce4c30114a605f9e2539f413d (patch) | |
| tree | c75f6836ba97c28b45e03ba1d78565766c4e9736 /test | |
| parent | 3b1321a9082b5a574c92a11ea6cf8725588eebda (diff) | |
| download | rabbitmq-server-git-8cc4229802161d0ce4c30114a605f9e2539f413d.tar.gz | |
adds lazy queue tests
Diffstat (limited to 'test')
| -rw-r--r-- | test/src/rabbit_tests.erl | 109 |
1 files changed, 84 insertions, 25 deletions
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 2235717ebf..0c4b869fb0 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -2645,7 +2645,7 @@ test_amqqueue(Durable) -> (rabbit_amqqueue:pseudo_queue(test_queue(), self())) #amqqueue { durable = Durable }. -with_fresh_variable_queue(Fun) -> +with_fresh_variable_queue(Fun, Mode) -> Ref = make_ref(), Me = self(), %% Run in a separate process since rabbit_msg_store will send @@ -2659,9 +2659,10 @@ with_fresh_variable_queue(Fun) -> {delta, undefined, 0, undefined}}, {q3, 0}, {q4, 0}, {len, 0}]), + VQ1 = set_queue_mode(Mode, VQ), try _ = rabbit_variable_queue:delete_and_terminate( - shutdown, Fun(VQ)), + shutdown, Fun(VQ1)), Me ! Ref catch Type:Error -> @@ -2674,6 +2675,12 @@ with_fresh_variable_queue(Fun) -> end, passed. +set_queue_mode(Mode, VQ) -> + VQ1 = rabbit_variable_queue:set_queue_mode(Mode, VQ), + S1 = variable_queue_status(VQ1), + assert_props(S1, [{mode, Mode}]), + VQ1. + publish_and_confirm(Q, Payload, Count) -> Seqs = lists:seq(1, Count), [begin @@ -2699,25 +2706,64 @@ wait_for_confirms(Unconfirmed) -> end. test_variable_queue() -> - [passed = with_fresh_variable_queue(F) || - F <- [fun test_variable_queue_dynamic_duration_change/1, - fun test_variable_queue_partial_segments_delta_thing/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_drop/1, - fun test_variable_queue_fold_msg_on_disk/1, - fun test_dropfetchwhile/1, - fun test_dropwhile_varying_ram_duration/1, - fun test_fetchwhile_varying_ram_duration/1, - fun test_variable_queue_ack_limiting/1, - fun test_variable_queue_purge/1, - fun test_variable_queue_requeue/1, - fun test_variable_queue_requeue_ram_beta/1, - fun test_variable_queue_fold/1, - fun test_variable_queue_batch_publish/1, - fun test_variable_queue_batch_publish_delivered/1]], + [passed = with_fresh_variable_queue(F, default) || + F <- variable_queue_test_funs()], + passed. + +test_lazy_variable_queue() -> + [passed = with_fresh_variable_queue(F, lazy) || + F <- variable_queue_test_funs() ++ [fun test_variable_queue_mode_change/1]], passed. +variable_queue_test_funs() -> + [fun test_variable_queue_dynamic_duration_change/1, + fun test_variable_queue_partial_segments_delta_thing/1, + fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_drop/1, + fun test_variable_queue_fold_msg_on_disk/1, + fun test_dropfetchwhile/1, + fun test_dropwhile_varying_ram_duration/1, + fun test_fetchwhile_varying_ram_duration/1, + fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_purge/1, + fun test_variable_queue_requeue/1, + fun test_variable_queue_requeue_ram_beta/1, + fun test_variable_queue_fold/1, + fun test_variable_queue_batch_publish/1, + fun test_variable_queue_batch_publish_delivered/1]. + +%% same as test_variable_queue_requeue_ram_beta but randomly chaning +%% the queue mode after every step. +test_variable_queue_mode_change(VQ0) -> + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2, + VQ1 = variable_queue_publish(false, Count, VQ0), + VQ2 = maybe_switch_queue_mode(VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + VQ4 = maybe_switch_queue_mode(VQ3), + {Back, Front} = lists:split(Count div 2, AcksR), + {_, VQ5} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ4), + VQ6 = maybe_switch_queue_mode(VQ5), + VQ7 = variable_queue_set_ram_duration_target(0, VQ6), + VQ8 = maybe_switch_queue_mode(VQ7), + {_, VQ9} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ8), + VQ10 = maybe_switch_queue_mode(VQ9), + VQ11 = requeue_one_by_one(Front, VQ10), + VQ12 = maybe_switch_queue_mode(VQ11), + {VQ13, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ12), + VQ14 = maybe_switch_queue_mode(VQ13), + {_, VQ15} = rabbit_variable_queue:ack(AcksAll, VQ14), + VQ16 = maybe_switch_queue_mode(VQ15), + VQ16. + +maybe_switch_queue_mode(VQ) -> + Mode = random_queue_mode(), + set_queue_mode(Mode, VQ). + +random_queue_mode() -> + Modes = [lazy, default], + lists:nth(random:uniform(length(Modes)), Modes). + test_variable_queue_batch_publish(VQ) -> Count = 10, VQ1 = variable_queue_batch_publish(true, Count, VQ), @@ -2795,18 +2841,30 @@ variable_queue_with_holes(VQ0) -> true, Count + 1, Interval, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), %% assertions - [false = case V of - {delta, _, 0, _} -> true; - 0 -> true; - _ -> false - end || {K, V} <- variable_queue_status(VQ8), - lists:member(K, [q1, delta, q3])], + Status = variable_queue_status(VQ8), + vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)), Depth = Count + Interval, Depth = rabbit_variable_queue:depth(VQ8), Len = Depth - length(Subset3), Len = rabbit_variable_queue:len(VQ8), {Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}. +vq_with_holes_assertions(VQ, default) -> + [false = + case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- variable_queue_status(VQ), + lists:member(K, [q1, delta, q3])]; +vq_with_holes_assertions(VQ, lazy) -> + [false = + case V of + {delta, _, 0, _} -> true; + _ -> false + end || {K, V} <- variable_queue_status(VQ), + lists:member(K, [delta])]. + test_variable_queue_requeue(VQ0) -> {_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), @@ -2974,6 +3032,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% start by sending in a couple of segments worth Len = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len, VQ0), + %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), |
