summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-19 21:38:32 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-19 21:38:32 +0200
commit8cc4229802161d0ce4c30114a605f9e2539f413d (patch)
treec75f6836ba97c28b45e03ba1d78565766c4e9736
parent3b1321a9082b5a574c92a11ea6cf8725588eebda (diff)
downloadrabbitmq-server-git-8cc4229802161d0ce4c30114a605f9e2539f413d.tar.gz
adds lazy queue tests
-rw-r--r--Makefile12
-rw-r--r--test/src/rabbit_tests.erl109
2 files changed, 96 insertions, 25 deletions
diff --git a/Makefile b/Makefile
index a5cfe034a5..168ee7675d 100644
--- a/Makefile
+++ b/Makefile
@@ -241,6 +241,18 @@ run-tests: all
OUT=$$(echo "rabbit_tests:all_tests()." | $(ERL_CALL)) ; \
echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
+run-vq-tests: all
+ echo 'code:add_path("$(TEST_EBIN_DIR)").' | $(ERL_CALL)
+ echo 'code:add_path("$(TEST_EBIN_DIR)").' | $(ERL_CALL) -n hare || true
+ OUT=$$(echo "rabbit_tests:test_backing_queue()." | $(ERL_CALL)) ; \
+ echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
+
+run-lazy-vq-tests: all
+ echo 'code:add_path("$(TEST_EBIN_DIR)").' | $(ERL_CALL)
+ echo 'code:add_path("$(TEST_EBIN_DIR)").' | $(ERL_CALL) -n hare || true
+ OUT=$$(echo "rabbit_tests:test_lazy_variable_queue()." | $(ERL_CALL)) ; \
+ echo $$OUT ; echo $$OUT | grep '^{ok, passed}$$' > /dev/null
+
run-qc: all
echo 'code:add_path("$(TEST_EBIN_DIR)").' | $(ERL_CALL)
./quickcheck $(RABBITMQ_NODENAME) rabbit_backing_queue_qc 100 40
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 2235717ebf..0c4b869fb0 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -2645,7 +2645,7 @@ test_amqqueue(Durable) ->
(rabbit_amqqueue:pseudo_queue(test_queue(), self()))
#amqqueue { durable = Durable }.
-with_fresh_variable_queue(Fun) ->
+with_fresh_variable_queue(Fun, Mode) ->
Ref = make_ref(),
Me = self(),
%% Run in a separate process since rabbit_msg_store will send
@@ -2659,9 +2659,10 @@ with_fresh_variable_queue(Fun) ->
{delta, undefined, 0, undefined}},
{q3, 0}, {q4, 0},
{len, 0}]),
+ VQ1 = set_queue_mode(Mode, VQ),
try
_ = rabbit_variable_queue:delete_and_terminate(
- shutdown, Fun(VQ)),
+ shutdown, Fun(VQ1)),
Me ! Ref
catch
Type:Error ->
@@ -2674,6 +2675,12 @@ with_fresh_variable_queue(Fun) ->
end,
passed.
+set_queue_mode(Mode, VQ) ->
+ VQ1 = rabbit_variable_queue:set_queue_mode(Mode, VQ),
+ S1 = variable_queue_status(VQ1),
+ assert_props(S1, [{mode, Mode}]),
+ VQ1.
+
publish_and_confirm(Q, Payload, Count) ->
Seqs = lists:seq(1, Count),
[begin
@@ -2699,25 +2706,64 @@ wait_for_confirms(Unconfirmed) ->
end.
test_variable_queue() ->
- [passed = with_fresh_variable_queue(F) ||
- F <- [fun test_variable_queue_dynamic_duration_change/1,
- fun test_variable_queue_partial_segments_delta_thing/1,
- fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
- fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_drop/1,
- fun test_variable_queue_fold_msg_on_disk/1,
- fun test_dropfetchwhile/1,
- fun test_dropwhile_varying_ram_duration/1,
- fun test_fetchwhile_varying_ram_duration/1,
- fun test_variable_queue_ack_limiting/1,
- fun test_variable_queue_purge/1,
- fun test_variable_queue_requeue/1,
- fun test_variable_queue_requeue_ram_beta/1,
- fun test_variable_queue_fold/1,
- fun test_variable_queue_batch_publish/1,
- fun test_variable_queue_batch_publish_delivered/1]],
+ [passed = with_fresh_variable_queue(F, default) ||
+ F <- variable_queue_test_funs()],
+ passed.
+
+test_lazy_variable_queue() ->
+ [passed = with_fresh_variable_queue(F, lazy) ||
+ F <- variable_queue_test_funs() ++ [fun test_variable_queue_mode_change/1]],
passed.
+variable_queue_test_funs() ->
+ [fun test_variable_queue_dynamic_duration_change/1,
+ fun test_variable_queue_partial_segments_delta_thing/1,
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
+ fun test_drop/1,
+ fun test_variable_queue_fold_msg_on_disk/1,
+ fun test_dropfetchwhile/1,
+ fun test_dropwhile_varying_ram_duration/1,
+ fun test_fetchwhile_varying_ram_duration/1,
+ fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_purge/1,
+ fun test_variable_queue_requeue/1,
+ fun test_variable_queue_requeue_ram_beta/1,
+ fun test_variable_queue_fold/1,
+ fun test_variable_queue_batch_publish/1,
+ fun test_variable_queue_batch_publish_delivered/1].
+
+%% same as test_variable_queue_requeue_ram_beta but randomly chaning
+%% the queue mode after every step.
+test_variable_queue_mode_change(VQ0) ->
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2,
+ VQ1 = variable_queue_publish(false, Count, VQ0),
+ VQ2 = maybe_switch_queue_mode(VQ1),
+ {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
+ VQ4 = maybe_switch_queue_mode(VQ3),
+ {Back, Front} = lists:split(Count div 2, AcksR),
+ {_, VQ5} = rabbit_variable_queue:requeue(erlang:tl(Back), VQ4),
+ VQ6 = maybe_switch_queue_mode(VQ5),
+ VQ7 = variable_queue_set_ram_duration_target(0, VQ6),
+ VQ8 = maybe_switch_queue_mode(VQ7),
+ {_, VQ9} = rabbit_variable_queue:requeue([erlang:hd(Back)], VQ8),
+ VQ10 = maybe_switch_queue_mode(VQ9),
+ VQ11 = requeue_one_by_one(Front, VQ10),
+ VQ12 = maybe_switch_queue_mode(VQ11),
+ {VQ13, AcksAll} = variable_queue_fetch(Count, false, true, Count, VQ12),
+ VQ14 = maybe_switch_queue_mode(VQ13),
+ {_, VQ15} = rabbit_variable_queue:ack(AcksAll, VQ14),
+ VQ16 = maybe_switch_queue_mode(VQ15),
+ VQ16.
+
+maybe_switch_queue_mode(VQ) ->
+ Mode = random_queue_mode(),
+ set_queue_mode(Mode, VQ).
+
+random_queue_mode() ->
+ Modes = [lazy, default],
+ lists:nth(random:uniform(length(Modes)), Modes).
+
test_variable_queue_batch_publish(VQ) ->
Count = 10,
VQ1 = variable_queue_batch_publish(true, Count, VQ),
@@ -2795,18 +2841,30 @@ variable_queue_with_holes(VQ0) ->
true, Count + 1, Interval,
fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
%% assertions
- [false = case V of
- {delta, _, 0, _} -> true;
- 0 -> true;
- _ -> false
- end || {K, V} <- variable_queue_status(VQ8),
- lists:member(K, [q1, delta, q3])],
+ Status = variable_queue_status(VQ8),
+ vq_with_holes_assertions(VQ8, proplists:get_value(mode, Status)),
Depth = Count + Interval,
Depth = rabbit_variable_queue:depth(VQ8),
Len = Depth - length(Subset3),
Len = rabbit_variable_queue:len(VQ8),
{Seq3, Seq -- Seq3, lists:seq(Count + 1, Count + Interval), VQ8}.
+vq_with_holes_assertions(VQ, default) ->
+ [false =
+ case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- variable_queue_status(VQ),
+ lists:member(K, [q1, delta, q3])];
+vq_with_holes_assertions(VQ, lazy) ->
+ [false =
+ case V of
+ {delta, _, 0, _} -> true;
+ _ -> false
+ end || {K, V} <- variable_queue_status(VQ),
+ lists:member(K, [delta])].
+
test_variable_queue_requeue(VQ0) ->
{_PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
variable_queue_with_holes(VQ0),
@@ -2974,6 +3032,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% start by sending in a couple of segments worth
Len = 2*SegmentSize,
VQ1 = variable_queue_publish(false, Len, VQ0),
+
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),