diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-02-12 23:28:53 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-02-12 23:28:53 +0300 |
| commit | 3efa9d81735ddbeb2e355699d01be02fd1ee7d06 (patch) | |
| tree | 7951c16b8495a1fa1a741e93e7858baa437b8dd2 /test | |
| parent | 19665bb3fd8e7c257703633f5550b35fb0775199 (diff) | |
| parent | cea04c52dcdcda16ecca5d471bdb7cc6ec16beb8 (diff) | |
| download | rabbitmq-server-git-3efa9d81735ddbeb2e355699d01be02fd1ee7d06.tar.gz | |
Merge branch 'master' into fix-more-dialyzer
Diffstat (limited to 'test')
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 12 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/metrics_SUITE.erl | 6 | ||||
| -rw-r--r-- | test/queue_master_location_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 587 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 545 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 5 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/rabbit_ha_test_consumer.erl | 2 | ||||
| -rw-r--r-- | test/rabbitmqctl_shutdown_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 75 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_log_config_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/worker_pool_SUITE.erl | 4 |
17 files changed, 681 insertions, 577 deletions
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 77da6133d8..47907d09f4 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -87,7 +87,7 @@ notify_down_all(Config) -> declare(QCfg0), %% Testing rabbit_amqqueue:notify_down_all via rabbit_channel. %% Consumer count = 0 after correct channel termination and - %% notification of queues via delagate:call/3 + %% notification of queues via delegate:call/3 true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))), rabbit_ct_client_helpers:close_channel(RabbitCh), 0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)), diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 5ae2fb687c..a4fd63ba04 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -153,9 +153,9 @@ join_and_part_cluster(Config) -> join_cluster_bad_operations(Config) -> [Rabbit, Hare, Bunny] = cluster_members(Config), - %% Non-existant node + %% Nonexistent node ok = stop_app(Rabbit), - assert_failure(fun () -> join_cluster(Rabbit, non@existant) end), + assert_failure(fun () -> join_cluster(Rabbit, non@existent) end), ok = start_app(Rabbit), assert_not_clustered(Rabbit), @@ -217,8 +217,8 @@ forget_cluster_node(Config) -> ok = stop_app(Rabbit), %% We're passing the --offline flag, but Hare is online assert_failure(fun () -> forget_cluster_node(Hare, Rabbit, true) end), - %% Removing some non-existant node will fail - assert_failure(fun () -> forget_cluster_node(Hare, non@existant) end), + %% Removing some nonexistent node will fail + assert_failure(fun () -> forget_cluster_node(Hare, non@existent) end), ok = forget_cluster_node(Hare, Rabbit), assert_not_clustered(Hare), assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]}, @@ -504,8 +504,8 @@ update_cluster_nodes(Config) -> stop_reset_start(Hare), assert_failure(fun () -> start_app(Rabbit) end), %% Bogus node - assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existant) end), - %% Inconsisent node + assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existent) end), + %% Inconsistent node assert_failure(fun () -> update_cluster_nodes(Rabbit, Hare) end), ok = update_cluster_nodes(Rabbit, Bunny), ok = start_app(Rabbit), diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 72c72b096c..6722958973 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -577,7 +577,7 @@ dead_letter_routing_key_header_BCC(Config) -> %% It is possible to form a cycle of message dead-lettering. For instance, %% this can happen when a queue dead-letters messages to the default exchange without -%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that +%% specifying a dead-letter routing key (5). Messages in such cycles (i.e. messages that %% reach the same queue twice) will be dropped if there was no rejections in the entire cycle. %% i.e. x-message-ttl (7), x-max-length (6) %% @@ -741,7 +741,7 @@ dead_letter_override_policy(Config) -> [_] = consume(Ch, DLXQName, [P1]). %% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be -%% overriden/ignored. +%% overridden/ignored. dead_letter_ignore_policy(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 6ccf3a75c3..3bdf7bb009 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -484,7 +484,7 @@ failing_random_policies(Config) -> [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), %% Those set of policies were found as failing by PropEr in the - %% `random_policy` test above. We add them explicitely here to make + %% `random_policy` test above. We add them explicitly here to make %% sure they get tested. ?assertEqual(true, test_random_policy(Config, Nodes, [{nodes, [A, B]}, {nodes, [A]}])), diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl index 44368b643d..d00c01408e 100644 --- a/test/metrics_SUITE.erl +++ b/test/metrics_SUITE.erl @@ -144,7 +144,7 @@ connection_metric_idemp(Config, {N, R}) -> || _ <- lists:seq(1, N)], Table = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], force_metric_gc(Config), TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], @@ -158,7 +158,7 @@ channel_metric_idemp(Config, {N, R}) -> [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)], Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], force_metric_gc(Config), TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], @@ -181,7 +181,7 @@ queue_metric_idemp(Config, {N, R}) -> Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times ChanTable = read_table_rpc(Config, channel_created), [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)], force_metric_gc(Config), diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index d4c8da2bcb..677355f5e8 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -22,7 +22,7 @@ %% location strategies can be applied in the following ways; %% 1. As policy, %% 2. As config (in rabbitmq.config), -%% 3. or as part of the queue's declare arguements. +%% 3. or as part of the queue's declare arguments. %% %% Currently supported strategies are; %% min-masters : The queue master node is calculated as the one with the diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl new file mode 100644 index 0000000000..1290c97b23 --- /dev/null +++ b/test/queue_parallel_SUITE.erl @@ -0,0 +1,587 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(queue_parallel_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [publish, + consume, + consume_first_empty, + consume_from_empty_queue, + consume_and_autoack, + subscribe, + subscribe_with_autoack, + consume_and_ack, + consume_and_multiple_ack, + subscribe_and_ack, + subscribe_and_multiple_ack, + subscribe_and_requeue_multiple_nack, + subscribe_and_nack, + subscribe_and_requeue_nack, + subscribe_and_multiple_nack, + consume_and_requeue_nack, + consume_and_nack, + consume_and_requeue_multiple_nack, + consume_and_multiple_nack, + basic_cancel, + purge, + basic_recover, + delete_immediately_by_resource + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +publish(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume_first_empty(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + rabbit_ct_client_helpers:close_channel(Ch). + +consume_from_empty_queue(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName). + +consume_and_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, true, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 10})), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + + subscribe(Ch, QName, false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + + %% validate we can retrieve the consumers + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + [Consumer] = lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), + + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +subscribe_with_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + subscribe(Ch, QName, true), + receive_basic_deliver(false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + receive_basic_deliver(true), + receive_basic_deliver(true), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +consume_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + +consume_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + +consume_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +subscribe_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +%% TODO test with single active +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers)), + publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + after 5000 -> + exit(basic_deliver_timeout) + end. + +purge(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + +basic_recover(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +delete_immediately_by_pid_fails(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_pid_succeeds(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = QName})). + +subscribe(Ch, Queue, NoAck) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +receive_basic_deliver(Redelivered) -> + receive + {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 02afba97c5..e9f49039ed 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -93,26 +93,7 @@ all_tests() -> restart_queue, restart_all_types, stop_start_rabbit_app, - publish, publish_and_restart, - consume, - consume_first_empty, - consume_from_empty_queue, - consume_and_autoack, - subscribe, - subscribe_with_autoack, - consume_and_ack, - consume_and_multiple_ack, - subscribe_and_ack, - subscribe_and_multiple_ack, - consume_and_requeue_nack, - consume_and_requeue_multiple_nack, - subscribe_and_requeue_nack, - subscribe_and_requeue_multiple_nack, - consume_and_nack, - consume_and_multiple_nack, - subscribe_and_nack, - subscribe_and_multiple_nack, subscribe_should_fail_when_global_qos_true, dead_letter_to_classic_queue, dead_letter_to_quorum_queue, @@ -120,14 +101,10 @@ all_tests() -> dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, - basic_cancel, - purge, sync_queue, cancel_sync_queue, - basic_recover, idempotent_recover, vhost_with_quorum_queue_is_deleted, - delete_immediately, delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, @@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) -> amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). -publish(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - publish(Ch, QQ), - Name = ra_name(QQ), - wait_for_messages_ready(Servers, Name, 1), - wait_for_messages_pending_ack(Servers, Name, 0). - publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), @@ -660,41 +625,6 @@ publish_and_restart(Config) -> wait_for_messages_ready(Servers, RaName, 2), wait_for_messages_pending_ack(Servers, RaName, 0). -consume(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_first_empty(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - consume_empty(Ch, QQ, false), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - rabbit_ct_client_helpers:close_channel(Ch). - consume_in_minority(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -711,63 +641,6 @@ consume_in_minority(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})). -consume_and_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, true), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_from_empty_queue(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - consume_empty(Ch, QQ, false). - -subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - qos(Ch, 10, false), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - %% validate we can retrieve the consumers - [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - ct:pal("Consumer ~p", [Consumer]), - ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), - ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), - ?assertEqual(true, proplists:get_value(ack_required, Consumer)), - ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), - ?assertEqual([], proplists:get_value(arguments, Consumer)), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) -> end, ok. -subscribe_with_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, true), - receive_basic_deliver(false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -consume_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - receive_basic_deliver(true), - receive_basic_deliver(true), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - dead_letter_to_classic_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1500,51 +1041,6 @@ delete_declare(Config) -> wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_cancel(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - [] = rpc:call(Server, ets, tab2list, [consumer_created]) - after 5000 -> - exit(basic_deliver_timeout) - end. - -purge(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - _DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), - wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages_ready(Servers, RaName, 0). - sync_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_recover(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -delete_immediately(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, Args)), - - Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], - {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), - ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - amqp_channel:call(Ch, #'queue.declare'{queue = QQ, - durable = true, - passive = true, - auto_delete = false, - arguments = Args})). - delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -2046,7 +1503,7 @@ consume_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - %% wait for requeueing + %% wait for requeuing timer:sleep(500), {#'basic.get_ok'{delivery_tag = DeliveryTag1, diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 293a597d14..60402b3a7b 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -104,7 +104,7 @@ basics(Config) -> exit(await_msg_timeout) end, - % process settle applied notificaiton + % process settle applied notification FState5b = process_ra_event(FState5, 250), _ = ra:stop_server(ServerId), _ = ra:restart_server(ServerId), @@ -395,7 +395,8 @@ cancel_checkout(Config) -> {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index c4f5690b72..437cd02e25 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -321,7 +321,7 @@ checkout_gen(Pid) -> }). expand(Ops) -> - %% execute each command against a rabbit_fifo state and capture all releavant + %% execute each command against a rabbit_fifo state and capture all relevant %% effects T = #t{}, #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), @@ -463,7 +463,7 @@ run_proper(Fun, Args, NumTests) -> end}])). run_snapshot_test(Conf, Commands) -> - %% create every incremental permuation of the commands lists + %% create every incremental permutation of the commands lists %% and run the snapshot tests against that [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl index 8d61903308..85f0a17b2a 100644 --- a/test/rabbit_ha_test_consumer.erl +++ b/test/rabbit_ha_test_consumer.erl @@ -64,7 +64,7 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) -> CancelOnFailover, MsgNum, MsgsToConsume - 1); MsgNum >= LowestSeen -> error_logger:info_msg( - "consumer ~p on ~p ignoring redeliverd msg ~p~n", + "consumer ~p on ~p ignoring redelivered msg ~p~n", [self(), Channel, MsgNum]), true = Redelivered, %% ASSERTION run(TestPid, Channel, Queue, diff --git a/test/rabbitmqctl_shutdown_SUITE.erl b/test/rabbitmqctl_shutdown_SUITE.erl index 0debfde2b6..b4279c6032 100644 --- a/test/rabbitmqctl_shutdown_SUITE.erl +++ b/test/rabbitmqctl_shutdown_SUITE.erl @@ -111,7 +111,7 @@ node_is_running(Node) -> shutdown_ok(Node) -> %% Start a command {stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []), - %% Execute command steps. Each step will ouput a binary string + %% Execute command steps. Each step will output a binary string Lines = 'Elixir.Enum':to_list(Stream), ct:pal("Command output ~p ~n", [Lines]), [true = is_binary(Line) || Line <- Lines], diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 6071aeb5a5..0b12f54c0b 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -33,12 +33,14 @@ groups() -> all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks, amqp_exclusive_consume_fails_on_exclusive_consumer_queue ]}, {quorum_queue, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, - fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], - {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2), FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), ?assertEqual(1, length(FirstActiveConsumerInList)), @@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], - - {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + + {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1), SecondActiveConsumerInList = maps:keys(maps:filter( fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, MessagesPerConsumer2) @@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), - wait_for_messages(1), + ?assertMatch({ok, _}, wait_for_messages(1)), LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), @@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> amqp_connection:close(C), ok. +fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) -> + %% Let's ensure that although the consumer is cancelled we still keep the unacked + %% messages and accept acknowledgments on them. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers0)), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]], + + {CTag, DTag1} = receive_deliver(), + {_CTag, DTag2} = receive_deliver(), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + + receive + #'basic.cancel_ok'{consumer_tag = CTag} -> + ok + end, + Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers1)), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + amqp_connection:close(C), + ok. + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), {C1, Ch1} = connection_and_channel(Config), @@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) -> wait_for_messages(ExpectedCount, {}). wait_for_messages(0, State) -> - State; -wait_for_messages(ExpectedCount, _) -> + {ok, State}; +wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) after 5000 -> - throw(message_waiting_timeout) + {missing, ExpectedCount, State} end. wait_for_cancel_ok() -> @@ -292,3 +342,12 @@ wait_for_cancel_ok() -> after 5000 -> throw(consumer_cancel_ok_timeout) end. + +receive_deliver() -> + receive + {#'basic.deliver'{consumer_tag = CTag, + delivery_tag = DTag}, _} -> + {CTag, DTag} + after 5000 -> + exit(deliver_timeout) + end. diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index d2db382e30..aaded5fa99 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -338,7 +338,7 @@ log_management_during_startup1(_Config) -> application:unset_env(lager, extra_sinks), ok = try rabbit:start() of ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}) + log_rotation_parent_dirs_test}) catch _:{error, {cannot_log_to_file, _, Reason2}} when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 364bd5aabb..9518e06196 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1384,7 +1384,7 @@ max_message_size(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - %% Binary is whithin the max size limit + %% Binary is within the max size limit amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), %% The channel process is alive assert_channel_alive(Ch), diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl index 584aa76760..07134f309a 100644 --- a/test/unit_log_config_SUITE.erl +++ b/test/unit_log_config_SUITE.erl @@ -660,7 +660,7 @@ env_var_tty(_) -> application:set_env(rabbit, lager_log_root, "/tmp/log_base"), application:set_env(rabbit, lager_default_file, tty), application:set_env(rabbit, lager_upgrade_file, tty), - %% tty can only be set explicitely + %% tty can only be set explicitly os:putenv("RABBITMQ_LOGS_source", "environment"), rabbit_lager:configure_lager(), diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl index 8c9c5fa366..e53abdfa05 100644 --- a/test/worker_pool_SUITE.erl +++ b/test/worker_pool_SUITE.erl @@ -142,7 +142,7 @@ cancel_timeout(_) -> reuse), timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) after 0 -> ok end. @@ -179,7 +179,7 @@ cancel_timeout_by_setting(_) -> reuse), timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) after 0 -> ok end, |
