diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-03-26 20:25:16 +0000 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-03-27 09:03:01 +0000 |
| commit | ce55899c883839230f6e3a6546bb4db9087905bb (patch) | |
| tree | 6e2013c790cfdc417e0cbc8f6b62210ce35902b1 /test | |
| parent | 1b87e590a232b878fa1869aff26eef675093a9bc (diff) | |
| download | rabbitmq-server-git-ce55899c883839230f6e3a6546bb4db9087905bb.tar.gz | |
Quorum queue in memory message limits
[#164735591]
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 219 |
1 files changed, 217 insertions, 2 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 02616677f3..531c5a5aed 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -114,7 +114,13 @@ all_tests() -> queue_length_limit_drop_head, subscribe_redelivery_limit, subscribe_redelivery_policy, - subscribe_redelivery_limit_with_dead_letter + subscribe_redelivery_limit_with_dead_letter, + queue_length_in_memory_limit_basic_get, + queue_length_in_memory_limit_subscribe, + queue_length_in_memory_limit, + queue_length_in_memory_bytes_limit_basic_get, + queue_length_in_memory_bytes_limit_subscribe, + queue_length_in_memory_bytes_limit ]. memory_tests() -> @@ -1812,6 +1818,212 @@ queue_length_limit_drop_head(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = true})). +queue_length_in_memory_limit_basic_get(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + +queue_length_in_memory_limit_subscribe(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 1}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = Msg1}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + end, + ?assertEqual([{0, 0}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = false}, + #amqp_msg{payload = Msg2}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}) + end. + +queue_length_in_memory_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-length">>, long, 2}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg1111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + publish(Ch, QQ, Msg4), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + +queue_length_in_memory_bytes_limit_basic_get(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 6}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = Msg1}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QQ}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg2">>}), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})). + +queue_length_in_memory_bytes_limit_subscribe(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 6}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + + ?assertEqual([{1, byte_size(Msg1)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = false}, + #amqp_msg{payload = Msg1}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = false}) + end, + ?assertEqual([{0, 0}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = false}, + #amqp_msg{payload = Msg2}} -> + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}) + end. + +queue_length_in_memory_bytes_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-in-memory-bytes">>, long, 12}])), + + RaName = ra_name(QQ), + Msg1 = <<"msg1">>, + Msg2 = <<"msg11">>, + Msg3 = <<"msg111">>, + Msg4 = <<"msg1111">>, + + publish(Ch, QQ, Msg1), + publish(Ch, QQ, Msg2), + publish(Ch, QQ, Msg3), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)), + + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = true})), + + wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]), + publish(Ch, QQ, Msg4), + wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]), + + ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}], + dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -1836,10 +2048,13 @@ publish_many(Ch, Queue, Count) -> [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. publish(Ch, Queue) -> + publish(Ch, Queue, <<"msg">>). + +publish(Ch, Queue, Msg) -> ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, - payload = <<"msg">>}). + payload = Msg}). consume(Ch, Queue, NoAck) -> {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, |
