summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2019-04-11 17:05:19 +0100
committerGitHub <noreply@github.com>2019-04-11 17:05:19 +0100
commit96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb (patch)
tree31471f84fa456235647628b708e28c8edbdbdc7e /test
parent46211bfa4de1aa8c532d16709823971127a9c552 (diff)
parent2ad83292a53a7d86ad56c7adff245d28989e9b32 (diff)
downloadrabbitmq-server-git-96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb.tar.gz
Merge pull request #1970 from rabbitmq/in-memory-limits
In memory limits for quorum queues
Diffstat (limited to 'test')
-rw-r--r--test/queue_parallel_SUITE.erl26
-rw-r--r--test/quorum_queue_SUITE.erl331
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl166
3 files changed, 501 insertions, 22 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
index d57b3c3021..eba0965608 100644
--- a/test/queue_parallel_SUITE.erl
+++ b/test/queue_parallel_SUITE.erl
@@ -62,7 +62,9 @@ groups() ->
[
{classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
{mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
- {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
+ {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]},
+ {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
]}
].
@@ -97,6 +99,28 @@ init_per_group(quorum_queue, Config) ->
Skip ->
Skip
end;
+init_per_group(quorum_queue_in_memory_limit, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 1}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
+init_per_group(quorum_queue_in_memory_bytes, Config) ->
+ case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of
+ ok ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 1}]},
+ {queue_durable, true}]);
+ Skip ->
+ Skip
+ end;
init_per_group(mirrored_queue, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 1aa5247b57..61f9328855 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -115,7 +115,16 @@ all_tests() ->
queue_length_limit_drop_head,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
- subscribe_redelivery_limit_with_dead_letter
+ subscribe_redelivery_limit_with_dead_letter,
+ queue_length_in_memory_limit_basic_get,
+ queue_length_in_memory_limit_subscribe,
+ queue_length_in_memory_limit,
+ queue_length_in_memory_limit_returns,
+ queue_length_in_memory_bytes_limit_basic_get,
+ queue_length_in_memory_bytes_limit_subscribe,
+ queue_length_in_memory_bytes_limit,
+ queue_length_in_memory_purge,
+ in_memory
].
memory_tests() ->
@@ -1826,6 +1835,321 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_in_memory_limit_basic_get(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg1}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
+queue_length_in_memory_limit_subscribe(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = Msg1}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ end,
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = false},
+ #amqp_msg{payload = Msg2}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false})
+ end.
+
+queue_length_in_memory_limit(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg1111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+ publish(Ch, QQ, Msg4),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+queue_length_in_memory_limit_returns(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg111">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false})),
+
+ {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = Msg2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false}),
+
+ publish(Ch, QQ, Msg3),
+ publish(Ch, QQ, Msg4),
+
+ %% Ensure that returns are subject to in memory limits too
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"2">>, <<"2">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg3) + byte_size(Msg4)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+queue_length_in_memory_bytes_limit_basic_get(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 6}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg1}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
+queue_length_in_memory_bytes_limit_subscribe(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 6}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = Msg1}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ end,
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = false},
+ #amqp_msg{payload = Msg2}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false})
+ end.
+
+queue_length_in_memory_bytes_limit(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 12}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg1111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+ publish(Ch, QQ, Msg4),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+queue_length_in_memory_purge(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ {'queue.purge_ok', 3} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
+
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+in_memory(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+
+ publish(Ch, QQ, Msg1),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ publish(Ch, QQ, Msg2),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -1850,10 +2174,13 @@ publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
publish(Ch, Queue) ->
+ publish(Ch, Queue, <<"msg">>).
+
+publish(Ch, Queue, Msg) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
- payload = <<"msg">>}).
+ payload = Msg}).
consume(Ch, Queue, NoAck) ->
{GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index cd6598fe33..2472827ffd 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -42,12 +42,16 @@ all_tests() ->
scenario15,
scenario16,
scenario17,
+ scenario18,
+ scenario19,
+ scenario20,
single_active,
single_active_01,
single_active_02,
single_active_03,
single_active_ordering,
- single_active_ordering_01
+ single_active_ordering_01,
+ in_memory_limit
% single_active_ordering_02
].
@@ -305,6 +309,53 @@ scenario17(_Config) ->
}, Commands),
ok.
+scenario18(_Config) ->
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ make_enqueue(E,4,<<"4">>),
+ make_enqueue(E,5,<<"5">>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ %% max_length => 3,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario19(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,3,<<"3">>),
+ make_settle(C1, [0, 1])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_in_memory_bytes => 370,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario20(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ {down, C1Pid, noconnection},
+ make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 97,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
single_active_01(_Config) ->
C1Pid = test_util:fake_pid(rabbit@fake_node1),
C1 = {<<0>>, C1Pid},
@@ -338,7 +389,7 @@ single_active_02(_Config) ->
make_checkout(C2, cancel),
{down,E,noconnection}
],
- Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1),
+ Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1, undefined, undefined),
?assert(single_active_prop(Conf, Commands, false)),
ok.
@@ -356,7 +407,7 @@ single_active_03(_Config) ->
{down, Pid, noconnection},
{nodeup, node()}
],
- Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, undefined, undefined),
?assert(single_active_prop(Conf, Commands, true)),
ok.
@@ -364,12 +415,15 @@ test_run_log(_Config) ->
Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
- frequency([{10, {0, 0, false, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
+ InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
- oneof([range(1, 3), undefined])
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
}}]),
?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)),
collect({log_size, length(O)},
@@ -378,18 +432,23 @@ test_run_log(_Config) ->
Length,
Bytes,
SingleActiveConsumer,
- DeliveryLimit), O))))
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
end, [], 10).
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
- frequency([{10, {0, 0, false, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
+ InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
- oneof([range(1, 3), undefined])
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
}}]),
?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
collect({log_size, length(O)},
@@ -398,18 +457,22 @@ snapshots(_Config) ->
Length,
Bytes,
SingleActiveConsumer,
- DeliveryLimit), O))))
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
end, [], 2500).
single_active(_Config) ->
Size = 2000,
run_proper(
fun () ->
- ?FORALL({Length, Bytes, DeliveryLimit},
- frequency([{10, {0, 0, 0}},
+ ?FORALL({Length, Bytes, DeliveryLimit, InMemoryLength, InMemoryBytes},
+ frequency([{10, {0, 0, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
- oneof([range(1, 3), undefined])
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
}}]),
?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)),
collect({log_size, length(O)},
@@ -418,7 +481,9 @@ single_active(_Config) ->
Length,
Bytes,
true,
- DeliveryLimit), O,
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O,
false))))
end, [], Size).
@@ -433,6 +498,8 @@ single_active_ordering(_Config) ->
undefined,
undefined,
true,
+ undefined,
+ undefined,
undefined), O,
true)))
end, [], Size).
@@ -454,7 +521,7 @@ single_active_ordering_01(_Config) ->
make_enqueue(E2, 1, 2),
make_settle(C1, [0])
],
- Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0),
?assert(single_active_prop(Conf, Commands, true)),
ok.
@@ -475,20 +542,81 @@ single_active_ordering_02(_Config) ->
{down,E,noproc},
make_settle(C1, [0])
],
- Conf = config(?FUNCTION_NAME, 0, 0, true, 0),
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0),
?assert(single_active_prop(Conf, Commands, true)),
ok.
-config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
+in_memory_limit(_Config) ->
+ Size = 2000,
+ run_proper(
+ fun () ->
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ boolean(),
+ oneof([range(1, 3), undefined]),
+ range(1, 10),
+ range(1, 1000)
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)),
+ collect({log_size, length(O)},
+ in_memory_limit_prop(
+ config(?FUNCTION_NAME,
+ Length,
+ Bytes,
+ SingleActiveConsumer,
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
+ end, [], Size).
+
+config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
single_active_consumer_on => SingleActive,
- delivery_limit => map_max(DeliveryLimit)}.
+ delivery_limit => map_max(DeliveryLimit),
+ max_in_memory_length => map_max(InMemoryLength),
+ max_in_memory_bytes => map_max(InMemoryBytes)}.
map_max(0) -> undefined;
map_max(N) -> N.
+in_memory_limit_prop(Conf0, Commands) ->
+ Conf = Conf0#{release_cursor_interval => 100},
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ try run_log(test_init(Conf), Entries) of
+ {_State, Effects} ->
+ %% validate message ordering
+ lists:foldl(fun ({log, Idxs, _}, ReleaseCursorIdx) ->
+ validate_idx_order(Idxs, ReleaseCursorIdx),
+ ReleaseCursorIdx;
+ ({release_cursor, Idx, _}, _) ->
+ Idx;
+ (_, Acc) ->
+ Acc
+ end, 0, Effects),
+ true;
+ _ ->
+ true
+ catch
+ Err ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
+validate_idx_order(Idxs, ReleaseCursorIdx) ->
+ Min = lists:min(Idxs),
+ case Min < ReleaseCursorIdx of
+ true ->
+ throw({invalid_log_index, Min, ReleaseCursorIdx});
+ false ->
+ ok
+ end.
+
single_active_prop(Conf0, Commands, ValidateOrder) ->
Conf = Conf0#{release_cursor_interval => 100},
Indexes = lists:seq(1, length(Commands)),