diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-04-11 17:05:19 +0100 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-04-11 17:05:19 +0100 |
| commit | 96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb (patch) | |
| tree | 31471f84fa456235647628b708e28c8edbdbdc7e /test | |
| parent | 46211bfa4de1aa8c532d16709823971127a9c552 (diff) | |
| parent | 2ad83292a53a7d86ad56c7adff245d28989e9b32 (diff) | |
| download | rabbitmq-server-git-96b9fb0f5cf6e563203e4b71af3a636cb3e95bfb.tar.gz | |
Merge pull request #1970 from rabbitmq/in-memory-limits
In memory limits for quorum queues
Diffstat (limited to 'test')
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 26 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 331 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 166 |
3 files changed, 501 insertions, 22 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl index d57b3c3021..eba0965608 100644 --- a/test/queue_parallel_SUITE.erl +++ b/test/queue_parallel_SUITE.erl @@ -62,7 +62,9 @@ groups() -> [ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, - {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, + {quorum_queue_in_memory_limit, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}, + {quorum_queue_in_memory_bytes, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} ]} ]. @@ -97,6 +99,28 @@ init_per_group(quorum_queue, Config) -> Skip -> Skip end; +init_per_group(quorum_queue_in_memory_limit, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}]}, + {queue_durable, true}]); + Skip -> + Skip + end; +init_per_group(quorum_queue_in_memory_bytes, Config) -> + case rabbit_ct_broker_helpers:enable_feature_flag(Config, quorum_queue) of + ok -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 1}]}, + {queue_durable, true}]); + Skip -> + Skip + end; init_per_group(mirrored_queue, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 1aa5247b57..61f9328855 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -115,7 +115,16 @@ all_tests() -> queue_length_limit_drop_head, subscribe_redelivery_limit, subscribe_redelivery_policy, - subscribe_redelivery_limit_with_dead_letter + subscribe_redelivery_limit_with_dead_letter, + queue_length_in_memory_limit_basic_get, + queue_length_in_memory_limit_subscribe, + queue_length_in_memory_limit, + queue_length_in_memory_limit_returns, + queue_length_in_memory_bytes_limit_basic_get, + queue_length_in_memory_bytes_limit_subscribe, + queue_length_in_memory_bytes_limit, + queue_length_in_memory_purge, + in_memory ]. memory_tests() -> @@ -1826,6 +1835,321 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_in_memory_limit_basic_get(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + +queue_length_in_memory_limit_subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = Msg1}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + end, + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = false}, + #amqp_msg{payload = Msg2}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}) + end. + +queue_length_in_memory_limit(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg1111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + publish(Ch, QQ, Msg4), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +queue_length_in_memory_limit_returns(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg111">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false})), + + {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = Msg2}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + + publish(Ch, QQ, Msg3), + publish(Ch, QQ, Msg4), + + %% Ensure that returns are subject to in memory limits too + wait_for_messages(Config, [[QQ, <<"4">>, <<"2">>, <<"2">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QQ, <<"4">>, <<"4">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg3) + byte_size(Msg4)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +queue_length_in_memory_bytes_limit_basic_get(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 6}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + +queue_length_in_memory_bytes_limit_subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 6}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = Msg1}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + end, + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = false}, + #amqp_msg{payload = Msg2}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}) + end. + +queue_length_in_memory_bytes_limit(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 12}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg1111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + publish(Ch, QQ, Msg4), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +queue_length_in_memory_purge(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + {'queue.purge_ok', 3} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), + + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +in_memory(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + + publish(Ch, QQ, Msg1), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + publish(Ch, QQ, Msg2), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -1850,10 +2174,13 @@ publish_many(Ch, Queue, Count) -> [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. publish(Ch, Queue) -> + publish(Ch, Queue, <<"msg">>). + +publish(Ch, Queue, Msg) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, - payload = <<"msg">>}). + payload = Msg}). consume(Ch, Queue, NoAck) -> {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index cd6598fe33..2472827ffd 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -42,12 +42,16 @@ all_tests() -> scenario15, scenario16, scenario17, + scenario18, + scenario19, + scenario20, single_active, single_active_01, single_active_02, single_active_03, single_active_ordering, - single_active_ordering_01 + single_active_ordering_01, + in_memory_limit % single_active_ordering_02 ]. @@ -305,6 +309,53 @@ scenario17(_Config) -> }, Commands), ok. +scenario18(_Config) -> + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + make_enqueue(E,4,<<"4">>), + make_enqueue(E,5,<<"5">>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + %% max_length => 3, + max_in_memory_length => 1}, Commands), + ok. + +scenario19(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,3,<<"3">>), + make_settle(C1, [0, 1]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_in_memory_bytes => 370, + max_in_memory_length => 1}, Commands), + ok. + +scenario20(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,2,simple_prefetch}), + {down, C1Pid, noconnection}, + make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 97, + max_in_memory_length => 1}, Commands), + ok. + single_active_01(_Config) -> C1Pid = test_util:fake_pid(rabbit@fake_node1), C1 = {<<0>>, C1Pid}, @@ -338,7 +389,7 @@ single_active_02(_Config) -> make_checkout(C2, cancel), {down,E,noconnection} ], - Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1), + Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1, undefined, undefined), ?assert(single_active_prop(Conf, Commands, false)), ok. @@ -356,7 +407,7 @@ single_active_03(_Config) -> {down, Pid, noconnection}, {nodeup, node()} ], - Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, undefined, undefined), ?assert(single_active_prop(Conf, Commands, true)), ok. @@ -364,12 +415,15 @@ test_run_log(_Config) -> Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, - frequency([{10, {0, 0, false, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, + InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), - oneof([range(1, 3), undefined]) + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) }}]), ?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)), collect({log_size, length(O)}, @@ -378,18 +432,23 @@ test_run_log(_Config) -> Length, Bytes, SingleActiveConsumer, - DeliveryLimit), O)))) + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) end, [], 10). snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, - frequency([{10, {0, 0, false, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, + InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), - oneof([range(1, 3), undefined]) + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) }}]), ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), collect({log_size, length(O)}, @@ -398,18 +457,22 @@ snapshots(_Config) -> Length, Bytes, SingleActiveConsumer, - DeliveryLimit), O)))) + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) end, [], 2500). single_active(_Config) -> Size = 2000, run_proper( fun () -> - ?FORALL({Length, Bytes, DeliveryLimit}, - frequency([{10, {0, 0, 0}}, + ?FORALL({Length, Bytes, DeliveryLimit, InMemoryLength, InMemoryBytes}, + frequency([{10, {0, 0, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), - oneof([range(1, 3), undefined]) + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) }}]), ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)), collect({log_size, length(O)}, @@ -418,7 +481,9 @@ single_active(_Config) -> Length, Bytes, true, - DeliveryLimit), O, + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O, false)))) end, [], Size). @@ -433,6 +498,8 @@ single_active_ordering(_Config) -> undefined, undefined, true, + undefined, + undefined, undefined), O, true))) end, [], Size). @@ -454,7 +521,7 @@ single_active_ordering_01(_Config) -> make_enqueue(E2, 1, 2), make_settle(C1, [0]) ], - Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0), ?assert(single_active_prop(Conf, Commands, true)), ok. @@ -475,20 +542,81 @@ single_active_ordering_02(_Config) -> {down,E,noproc}, make_settle(C1, [0]) ], - Conf = config(?FUNCTION_NAME, 0, 0, true, 0), + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0), ?assert(single_active_prop(Conf, Commands, true)), ok. -config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> +in_memory_limit(_Config) -> + Size = 2000, + run_proper( + fun () -> + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + boolean(), + oneof([range(1, 3), undefined]), + range(1, 10), + range(1, 1000) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)), + collect({log_size, length(O)}, + in_memory_limit_prop( + config(?FUNCTION_NAME, + Length, + Bytes, + SingleActiveConsumer, + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) + end, [], Size). + +config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), single_active_consumer_on => SingleActive, - delivery_limit => map_max(DeliveryLimit)}. + delivery_limit => map_max(DeliveryLimit), + max_in_memory_length => map_max(InMemoryLength), + max_in_memory_bytes => map_max(InMemoryBytes)}. map_max(0) -> undefined; map_max(N) -> N. +in_memory_limit_prop(Conf0, Commands) -> + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + try run_log(test_init(Conf), Entries) of + {_State, Effects} -> + %% validate message ordering + lists:foldl(fun ({log, Idxs, _}, ReleaseCursorIdx) -> + validate_idx_order(Idxs, ReleaseCursorIdx), + ReleaseCursorIdx; + ({release_cursor, Idx, _}, _) -> + Idx; + (_, Acc) -> + Acc + end, 0, Effects), + true; + _ -> + true + catch + Err -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + ct:pal("Err: ~p~n", [Err]), + false + end. + +validate_idx_order(Idxs, ReleaseCursorIdx) -> + Min = lists:min(Idxs), + case Min < ReleaseCursorIdx of + true -> + throw({invalid_log_index, Min, ReleaseCursorIdx}); + false -> + ok + end. + single_active_prop(Conf0, Commands, ValidateOrder) -> Conf = Conf0#{release_cursor_interval => 100}, Indexes = lists:seq(1, length(Commands)), |
