summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:23:21 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2018-12-14 18:23:21 +0100
commit1f052db8900e388535249603c28e44ee3e40a607 (patch)
tree99ea50e7b4323dcae15b1302cbe7c58aa8b014bf /test
parentb4d354546d44799f9cdab979c02eecb4245e67ad (diff)
parentbc662bc0a807f5b0ad0681fb2bcf49534f93729c (diff)
downloadrabbitmq-server-git-1f052db8900e388535249603c28e44ee3e40a607.tar.gz
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
Conflicts: src/rabbit_fifo.erl src/rabbit_quorum_queue.erl
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl82
1 files changed, 78 insertions, 4 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index c94fb9ddab..d698618494 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -120,6 +120,7 @@ all_tests() ->
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
+ message_bytes_metrics,
memory_alarm_rolls_wal
].
@@ -2039,7 +2040,7 @@ consume_redelivery_count(Config) ->
requeue = true}),
%% wait for requeueing
timer:sleep(500),
-
+
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2058,14 +2059,71 @@ consume_redelivery_count(Config) ->
?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)),
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2,
multiple = false,
- requeue = true}).
+ requeue = true}),
+ ok.
-memory_alarm_rolls_wal(Config) ->
+message_bytes_metrics(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+
+ publish(Ch, QQ),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {3, 3, 0} == get_message_bytes(Leader, QRes)
+ end),
+
+ subscribe(Ch, QQ, false),
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_until(fun() ->
+ {3, 0, 3} == get_message_bytes(Leader, QRes)
+ end),
+
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {0, 0, 0} == get_message_bytes(Leader, QRes)
+ end)
+ end,
+
+ %% Let's publish and then close the consumer channel. Messages must be
+ %% returned to the queue
+ publish(Ch, QQ),
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_until(fun() ->
+ {3, 0, 3} == get_message_bytes(Leader, QRes)
+ end),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_until(fun() ->
+ {3, 3, 0} == get_message_bytes(Leader, QRes)
+ end),
+ ok.
+
+memory_alarm_rolls_wal(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []),
[Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"),
ok = rpc:call(Server, rabbit_alarm, set_alarm,
@@ -2079,7 +2137,11 @@ memory_alarm_rolls_wal(Config) ->
[{{resource_limit, memory, Server}, []}]),
timer:sleep(1000),
[Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"),
- ?assert(Wal1 == Wal2).
+ ?assert(Wal1 == Wal2),
+ ok = rpc:call(Server, rabbit_alarm, clear_alarm,
+ [{{resource_limit, memory, Server}, []}]),
+ timer:sleep(1000),
+ ok.
%%----------------------------------------------------------------------------
@@ -2219,6 +2281,7 @@ dirty_query(Servers, QName, Fun) ->
fun(N) ->
case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
{ok, {_, Msgs}, _} ->
+ ct:pal("Msgs ~w", [Msgs]),
Msgs;
_ ->
undefined
@@ -2258,3 +2321,14 @@ delete_queues() ->
stop_node(Config, Server) ->
rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).
+
+get_message_bytes(Leader, QRes) ->
+ case rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) of
+ [{QRes, Props, _}] ->
+ {proplists:get_value(message_bytes, Props),
+ proplists:get_value(message_bytes_ready, Props),
+ proplists:get_value(message_bytes_unacknowledged, Props)};
+ _ ->
+ []
+ end.
+