summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-02 12:26:28 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-03 16:32:32 +0100
commit19f8dec83bfa899cf7d43e532ca8a52c73213a97 (patch)
tree0876af93cb80b793c44c57110a33e48e55e5b68a /test
parent74aac975655b4d419ac53c911768048234edbb9a (diff)
downloadrabbitmq-server-git-19f8dec83bfa899cf7d43e532ca8a52c73213a97.tar.gz
Fix in-memory counters
[#164735591]
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl77
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl71
2 files changed, 123 insertions, 25 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 531c5a5aed..31d6a4c21b 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -120,7 +120,8 @@ all_tests() ->
queue_length_in_memory_limit,
queue_length_in_memory_bytes_limit_basic_get,
queue_length_in_memory_bytes_limit_subscribe,
- queue_length_in_memory_bytes_limit
+ queue_length_in_memory_bytes_limit,
+ in_memory
].
memory_tests() ->
@@ -1819,7 +1820,7 @@ queue_length_limit_drop_head(Config) ->
no_ack = true})).
queue_length_in_memory_limit_basic_get(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1841,7 +1842,7 @@ queue_length_in_memory_limit_basic_get(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -1851,7 +1852,7 @@ queue_length_in_memory_limit_basic_get(Config) ->
no_ack = true})).
queue_length_in_memory_limit_subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1867,7 +1868,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
subscribe(Ch, QQ, false),
receive
@@ -1878,7 +1879,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
multiple = false})
end,
?assertEqual([{0, 0}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag2,
redelivered = false},
@@ -1888,7 +1889,7 @@ queue_length_in_memory_limit_subscribe(Config) ->
end.
queue_length_in_memory_limit(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1908,8 +1909,8 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
-
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = true})),
@@ -1919,10 +1920,10 @@ queue_length_in_memory_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
queue_length_in_memory_bytes_limit_basic_get(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1944,7 +1945,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -1954,7 +1955,7 @@ queue_length_in_memory_bytes_limit_basic_get(Config) ->
no_ack = true})).
queue_length_in_memory_bytes_limit_subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -1970,7 +1971,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
wait_for_messages(Config, [[QQ, <<"2">>, <<"2">>, <<"0">>]]),
?assertEqual([{1, byte_size(Msg1)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
subscribe(Ch, QQ, false),
receive
@@ -1981,7 +1982,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
multiple = false})
end,
?assertEqual([{0, 0}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
receive
{#'basic.deliver'{delivery_tag = DeliveryTag2,
redelivered = false},
@@ -1991,7 +1992,7 @@ queue_length_in_memory_bytes_limit_subscribe(Config) ->
end.
queue_length_in_memory_bytes_limit(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
@@ -2011,7 +2012,7 @@ queue_length_in_memory_bytes_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg1) + byte_size(Msg2)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = Msg1}},
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
@@ -2022,7 +2023,47 @@ queue_length_in_memory_bytes_limit(Config) ->
wait_for_messages(Config, [[QQ, <<"3">>, <<"3">>, <<"0">>]]),
?assertEqual([{2, byte_size(Msg2) + byte_size(Msg4)}],
- dirty_query(Servers, RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
+
+in_memory(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ Msg1 = <<"msg1">>,
+ Msg2 = <<"msg11">>,
+
+ publish(Ch, QQ, Msg1),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"1">>, <<"0">>]]),
+ ?assertEqual([{1, byte_size(Msg1)}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ subscribe(Ch, QQ, false),
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ publish(Ch, QQ, Msg2),
+
+ wait_for_messages(Config, [[QQ, <<"2">>, <<"0">>, <<"2">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, #amqp_msg{}} ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = false})
+ end,
+
+ wait_for_messages(Config, [[QQ, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([{0, 0}],
+ dirty_query([Server], RaName, fun rabbit_fifo:query_in_memory_usage/1)).
%%----------------------------------------------------------------------------
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 76b8d7e715..58e6c2452c 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -38,7 +38,10 @@ all_tests() ->
scenario14,
scenario15,
scenario16,
- scenario17
+ scenario17,
+ scenario18,
+ scenario19,
+ scenario20
].
groups() ->
@@ -293,15 +296,65 @@ scenario17(_Config) ->
}, Commands),
ok.
+scenario18(_Config) ->
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_enqueue(E,3,<<"3">>),
+ make_enqueue(E,4,<<"4">>),
+ make_enqueue(E,5,<<"5">>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ %% max_length => 3,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario19(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<"1">>),
+ make_enqueue(E,2,<<"2">>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,3,<<"3">>),
+ make_settle(C1, [0, 1])
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_in_memory_bytes => 370,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
+scenario20(_Config) ->
+ C1Pid = c:pid(0,883,1),
+ C1 = {<<>>, C1Pid},
+ E = c:pid(0,176,1),
+ Commands = [make_enqueue(E,1,<<>>),
+ make_enqueue(E,2,<<>>),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ {down, C1Pid, noconnection},
+ make_enqueue(E,3,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,4,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,5,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,6,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0,0>>),
+ make_enqueue(E,7,<<0,0,0,0,0,0,0,0,0,0,0,0,0,0>>)
+ ],
+ run_snapshot_test(#{name => ?FUNCTION_NAME,
+ max_bytes => 97,
+ max_in_memory_length => 1}, Commands),
+ ok.
+
snapshots(_Config) ->
run_proper(
fun () ->
- ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit},
- frequency([{10, {0, 0, false, 0}},
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
+ InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
{5, {oneof([range(1, 10), undefined]),
oneof([range(1, 1000), undefined]),
boolean(),
- oneof([range(1, 3), undefined])
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
}}]),
?FORALL(O, ?LET(Ops, log_gen(250), expand(Ops)),
collect({log_size, length(O)},
@@ -310,15 +363,19 @@ snapshots(_Config) ->
Length,
Bytes,
SingleActiveConsumer,
- DeliveryLimit), O))))
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
end, [], 2500).
-config(Name, Length, Bytes, SingleActive, DeliveryLimit) ->
+config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
#{name => Name,
max_length => map_max(Length),
max_bytes => map_max(Bytes),
single_active_consumer_on => SingleActive,
- delivery_limit => map_max(DeliveryLimit)}.
+ delivery_limit => map_max(DeliveryLimit),
+ max_in_memory_length => map_max(InMemoryLength),
+ max_in_memory_bytes => map_max(InMemoryBytes)}.
map_max(0) -> undefined;
map_max(N) -> N.