summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-03-26 20:25:16 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-03-27 09:03:01 +0000
commitce55899c883839230f6e3a6546bb4db9087905bb (patch)
tree6e2013c790cfdc417e0cbc8f6b62210ce35902b1 /test
parent1b87e590a232b878fa1869aff26eef675093a9bc (diff)
downloadrabbitmq-server-git-ce55899c883839230f6e3a6546bb4db9087905bb.tar.gz
Quorum queue in memory message limits
[#164735591]
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl219
1 files changed, 217 insertions, 2 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 02616677f3..531c5a5aed 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -114,7 +114,13 @@ all_tests() ->
queue_length_limit_drop_head,
subscribe_redelivery_limit,
subscribe_redelivery_policy,
- subscribe_redelivery_limit_with_dead_letter
+ subscribe_redelivery_limit_with_dead_letter,
+ queue_length_in_memory_limit_basic_get,
+ queue_length_in_memory_limit_subscribe,
+ queue_length_in_memory_limit,
+ queue_length_in_memory_bytes_limit_basic_get,
+ queue_length_in_memory_bytes_limit_subscribe,
+ queue_length_in_memory_bytes_limit
].
memory_tests() ->
@@ -1812,6 +1818,212 @@ queue_length_limit_drop_head(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})).
+queue_length_in_memory_limit_basic_get(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg1}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
+queue_length_in_memory_limit_subscribe(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = Msg1}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ end,
+ ?assertEqual([{0, 0}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = false},
+ #amqp_msg{payload = Msg2}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false})
+ end.
+
+queue_length_in_memory_limit(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-length">>, long, 2}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg1111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+ publish(Ch, QQ, Msg4),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+queue_length_in_memory_bytes_limit_basic_get(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 6}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Msg1}),
+ ok = amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QQ},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg2">>}),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg2">>}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})).
+
+queue_length_in_memory_bytes_limit_subscribe(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 6}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = false},
+ #amqp_msg{payload = Msg1}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = false})
+ end,
+ ?assertEqual([{0, 0}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag2,
+ redelivered = false},
+ #amqp_msg{payload = Msg2}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2,
+ multiple = false})
+ end.
+
+queue_length_in_memory_bytes_limit(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-in-memory-bytes">>, long, 12}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+ Msg3 = <<"msg111">>,
+ Msg4 = <<"msg1111">>,
+
+ publish(Ch, QQ, Msg1),
+ publish(Ch, QQ, Msg2),
+ publish(Ch, QQ, Msg3),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = true})),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
+ publish(Ch, QQ, Msg4),
+ wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
+
+ ?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
+ dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
%%----------------------------------------------------------------------------
declare(Ch, Q) ->
@@ -1836,10 +2048,13 @@ publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
publish(Ch, Queue) ->
+ publish(Ch, Queue, <<"msg">>).
+
+publish(Ch, Queue, Msg) ->
ok = amqp_channel:cast(Ch,
#'basic.publish'{routing_key = Queue},
#amqp_msg{props = #'P_basic'{delivery_mode = 2},
- payload = <<"msg">>}).
+ payload = Msg}).
consume(Ch, Queue, NoAck) ->
{GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,