diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-02 12:26:28 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-03 16:32:32 +0100 |
| commit | 19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch) | |
| tree | 0876af93cb80b793c44c57110a33e48e55e5b68a /test | |
| parent | 74aac975655b4d419ac53c911768048234edbb9a (diff) | |
| download | rabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz | |
Fix in-memory counters
[#164735591]
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 77 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 71 |
2 files changed, 123 insertions, 25 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 531c5a5aed..31d6a4c21b 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -120,7 +120,8 @@ all_tests() -> queue_length_in_memory_limit, queue_length_in_memory_bytes_limit_basic_get, queue_length_in_memory_bytes_limit_subscribe, - queue_length_in_memory_bytes_limit + queue_length_in_memory_bytes_limit, + in_memory ]. memory_tests() -> @@ -1819,7 +1820,7 @@ queue_length_limit_drop_head(Config) -> no_ack = true})). queue_length_in_memory_limit_basic_get(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1841,7 +1842,7 @@ queue_length_in_memory_limit_basic_get(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -1851,7 +1852,7 @@ queue_length_in_memory_limit_basic_get(Config) -> no_ack = true})). queue_length_in_memory_limit_subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1867,7 +1868,7 @@ queue_length_in_memory_limit_subscribe(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), subscribe(Ch, QQ, false), receive @@ -1878,7 +1879,7 @@ queue_length_in_memory_limit_subscribe(Config) -> multiple = false}) end, ?assertEqual([{0, 0}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), receive {#'basic.deliver'{delivery_tag = DeliveryTag2, redelivered = false}, @@ -1888,7 +1889,7 @@ queue_length_in_memory_limit_subscribe(Config) -> end. queue_length_in_memory_limit(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1908,8 +1909,8 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), - + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})), @@ -1919,10 +1920,10 @@ queue_length_in_memory_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). queue_length_in_memory_bytes_limit_basic_get(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1944,7 +1945,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -1954,7 +1955,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) -> no_ack = true})). queue_length_in_memory_bytes_limit_subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -1970,7 +1971,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), ?assertEqual([{1, byte_size(Msg1)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), subscribe(Ch, QQ, false), receive @@ -1981,7 +1982,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> multiple = false}) end, ?assertEqual([{0, 0}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), receive {#'basic.deliver'{delivery_tag = DeliveryTag2, redelivered = false}, @@ -1991,7 +1992,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) -> end. queue_length_in_memory_bytes_limit(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), QQ = ?config(queue_name, Config), @@ -2011,7 +2012,7 @@ queue_length_in_memory_bytes_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, amqp_channel:call(Ch, #'basic.get'{queue = QQ, @@ -2022,7 +2023,47 @@ queue_length_in_memory_bytes_limit(Config) -> wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], - dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +in_memory(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + + publish(Ch, QQ, Msg1), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]), + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + publish(Ch, QQ, Msg2), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}) + end, + + wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([{0, 0}], + dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)). %%---------------------------------------------------------------------------- diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 76b8d7e715..58e6c2452c 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -38,7 +38,10 @@ all_tests() -> scenario14, scenario15, scenario16, - scenario17 + scenario17, + scenario18, + scenario19, + scenario20 ]. groups() -> @@ -293,15 +296,65 @@ scenario17(_Config) -> }, Commands), ok. +scenario18(_Config) -> + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_enqueue(E,3,<<"3">>), + make_enqueue(E,4,<<"4">>), + make_enqueue(E,5,<<"5">>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + %% max_length => 3, + max_in_memory_length => 1}, Commands), + ok. + +scenario19(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<"1">>), + make_enqueue(E,2,<<"2">>), + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,3,<<"3">>), + make_settle(C1, [0, 1]) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_in_memory_bytes => 370, + max_in_memory_length => 1}, Commands), + ok. + +scenario20(_Config) -> + C1Pid = c:pid(0,883,1), + C1 = {<<>>, C1Pid}, + E = c:pid(0,176,1), + Commands = [make_enqueue(E,1,<<>>), + make_enqueue(E,2,<<>>), + make_checkout(C1, {auto,2,simple_prefetch}), + {down, C1Pid, noconnection}, + make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>), + make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>) + ], + run_snapshot_test(#{name => ?FUNCTION_NAME, + max_bytes => 97, + max_in_memory_length => 1}, Commands), + ok. + snapshots(_Config) -> run_proper( fun () -> - ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit}, - frequency([{10, {0, 0, false, 0}}, + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, + InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, {5, {oneof([range(1, 10), undefined]), oneof([range(1, 1000), undefined]), boolean(), - oneof([range(1, 3), undefined]) + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) }}]), ?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)), collect({log_size, length(O)}, @@ -310,15 +363,19 @@ snapshots(_Config) -> Length, Bytes, SingleActiveConsumer, - DeliveryLimit), O)))) + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) end, [], 2500). -config(Name, Length, Bytes, SingleActive, DeliveryLimit) -> +config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> #{name => Name, max_length => map_max(Length), max_bytes => map_max(Bytes), single_active_consumer_on => SingleActive, - delivery_limit => map_max(DeliveryLimit)}. + delivery_limit => map_max(DeliveryLimit), + max_in_memory_length => map_max(InMemoryLength), + max_in_memory_bytes => map_max(InMemoryBytes)}. map_max(0) -> undefined; map_max(N) -> N. |
