summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-02-12 23:28:53 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-02-12 23:28:53 +0300
commit3efa9d81735ddbeb2e355699d01be02fd1ee7d06 (patch)
tree7951c16b8495a1fa1a741e93e7858baa437b8dd2 /test
parent19665bb3fd8e7c257703633f5550b35fb0775199 (diff)
parentcea04c52dcdcda16ecca5d471bdb7cc6ec16beb8 (diff)
downloadrabbitmq-server-git-3efa9d81735ddbeb2e355699d01be02fd1ee7d06.tar.gz
Merge branch 'master' into fix-more-dialyzer
Diffstat (limited to 'test')
-rw-r--r--test/channel_operation_timeout_SUITE.erl2
-rw-r--r--test/clustering_management_SUITE.erl12
-rw-r--r--test/dead_lettering_SUITE.erl4
-rw-r--r--test/dynamic_ha_SUITE.erl2
-rw-r--r--test/metrics_SUITE.erl6
-rw-r--r--test/queue_master_location_SUITE.erl2
-rw-r--r--test/queue_parallel_SUITE.erl587
-rw-r--r--test/quorum_queue_SUITE.erl545
-rw-r--r--test/rabbit_fifo_SUITE.erl5
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl4
-rw-r--r--test/rabbit_ha_test_consumer.erl2
-rw-r--r--test/rabbitmqctl_shutdown_SUITE.erl2
-rw-r--r--test/single_active_consumer_SUITE.erl75
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl2
-rw-r--r--test/unit_log_config_SUITE.erl2
-rw-r--r--test/worker_pool_SUITE.erl4
17 files changed, 681 insertions, 577 deletions
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index 77da6133d8..47907d09f4 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -87,7 +87,7 @@ notify_down_all(Config) ->
declare(QCfg0),
%% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
%% Consumer count = 0 after correct channel termination and
- %% notification of queues via delagate:call/3
+ %% notification of queues via delegate:call/3
true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))),
rabbit_ct_client_helpers:close_channel(RabbitCh),
0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 5ae2fb687c..a4fd63ba04 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -153,9 +153,9 @@ join_and_part_cluster(Config) ->
join_cluster_bad_operations(Config) ->
[Rabbit, Hare, Bunny] = cluster_members(Config),
- %% Non-existant node
+ %% Nonexistent node
ok = stop_app(Rabbit),
- assert_failure(fun () -> join_cluster(Rabbit, non@existant) end),
+ assert_failure(fun () -> join_cluster(Rabbit, non@existent) end),
ok = start_app(Rabbit),
assert_not_clustered(Rabbit),
@@ -217,8 +217,8 @@ forget_cluster_node(Config) ->
ok = stop_app(Rabbit),
%% We're passing the --offline flag, but Hare is online
assert_failure(fun () -> forget_cluster_node(Hare, Rabbit, true) end),
- %% Removing some non-existant node will fail
- assert_failure(fun () -> forget_cluster_node(Hare, non@existant) end),
+ %% Removing some nonexistent node will fail
+ assert_failure(fun () -> forget_cluster_node(Hare, non@existent) end),
ok = forget_cluster_node(Hare, Rabbit),
assert_not_clustered(Hare),
assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
@@ -504,8 +504,8 @@ update_cluster_nodes(Config) ->
stop_reset_start(Hare),
assert_failure(fun () -> start_app(Rabbit) end),
%% Bogus node
- assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existant) end),
- %% Inconsisent node
+ assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existent) end),
+ %% Inconsistent node
assert_failure(fun () -> update_cluster_nodes(Rabbit, Hare) end),
ok = update_cluster_nodes(Rabbit, Bunny),
ok = start_app(Rabbit),
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 72c72b096c..6722958973 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -577,7 +577,7 @@ dead_letter_routing_key_header_BCC(Config) ->
%% It is possible to form a cycle of message dead-lettering. For instance,
%% this can happen when a queue dead-letters messages to the default exchange without
-%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that
+%% specifying a dead-letter routing key (5). Messages in such cycles (i.e. messages that
%% reach the same queue twice) will be dropped if there was no rejections in the entire cycle.
%% i.e. x-message-ttl (7), x-max-length (6)
%%
@@ -741,7 +741,7 @@ dead_letter_override_policy(Config) ->
[_] = consume(Ch, DLXQName, [P1]).
%% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be
-%% overriden/ignored.
+%% overridden/ignored.
dead_letter_ignore_policy(Config) ->
{_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
QName = ?config(queue_name, Config),
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index 6ccf3a75c3..3bdf7bb009 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -484,7 +484,7 @@ failing_random_policies(Config) ->
[A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
nodename),
%% Those set of policies were found as failing by PropEr in the
- %% `random_policy` test above. We add them explicitely here to make
+ %% `random_policy` test above. We add them explicitly here to make
%% sure they get tested.
?assertEqual(true, test_random_policy(Config, Nodes,
[{nodes, [A, B]}, {nodes, [A]}])),
diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl
index 44368b643d..d00c01408e 100644
--- a/test/metrics_SUITE.erl
+++ b/test/metrics_SUITE.erl
@@ -144,7 +144,7 @@ connection_metric_idemp(Config, {N, R}) ->
|| _ <- lists:seq(1, N)],
Table = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)],
- % referesh stats 'R' times
+ % refresh stats 'R' times
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
force_metric_gc(Config),
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)],
@@ -158,7 +158,7 @@ channel_metric_idemp(Config, {N, R}) ->
[amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)],
Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)],
- % referesh stats 'R' times
+ % refresh stats 'R' times
[[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)],
force_metric_gc(Config),
TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)],
@@ -181,7 +181,7 @@ queue_metric_idemp(Config, {N, R}) ->
Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)],
Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)],
- % referesh stats 'R' times
+ % refresh stats 'R' times
ChanTable = read_table_rpc(Config, channel_created),
[[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)],
force_metric_gc(Config),
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
index d4c8da2bcb..677355f5e8 100644
--- a/test/queue_master_location_SUITE.erl
+++ b/test/queue_master_location_SUITE.erl
@@ -22,7 +22,7 @@
%% location strategies can be applied in the following ways;
%% 1. As policy,
%% 2. As config (in rabbitmq.config),
-%% 3. or as part of the queue's declare arguements.
+%% 3. or as part of the queue's declare arguments.
%%
%% Currently supported strategies are;
%% min-masters : The queue master node is calculated as the one with the
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
new file mode 100644
index 0000000000..1290c97b23
--- /dev/null
+++ b/test/queue_parallel_SUITE.erl
@@ -0,0 +1,587 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(queue_parallel_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [publish,
+ consume,
+ consume_first_empty,
+ consume_from_empty_queue,
+ consume_and_autoack,
+ subscribe,
+ subscribe_with_autoack,
+ consume_and_ack,
+ consume_and_multiple_ack,
+ subscribe_and_ack,
+ subscribe_and_multiple_ack,
+ subscribe_and_requeue_multiple_nack,
+ subscribe_and_nack,
+ subscribe_and_requeue_nack,
+ subscribe_and_multiple_nack,
+ consume_and_requeue_nack,
+ consume_and_nack,
+ consume_and_requeue_multiple_nack,
+ consume_and_multiple_nack,
+ basic_cancel,
+ purge,
+ basic_recover,
+ delete_immediately_by_resource
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+publish(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume_first_empty(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+consume_from_empty_queue(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName).
+
+consume_and_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, true, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = false,
+ prefetch_count = 10})),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+
+ %% validate we can retrieve the consumers
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ [Consumer] = lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers),
+ ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
+ ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
+ ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
+ ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
+ ?assertEqual([], proplists:get_value(arguments, Consumer)),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+subscribe_with_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ subscribe(Ch, QName, true),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ receive_basic_deliver(true),
+ receive_basic_deliver(true),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+consume_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+
+consume_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+
+consume_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+subscribe_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+%% TODO test with single active
+basic_cancel(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers)),
+ publish(Ch, QName, [<<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
+ after 5000 ->
+ exit(basic_deliver_timeout)
+ end.
+
+purge(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+
+basic_recover(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+delete_immediately_by_pid_fails(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_pid_succeeds(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+consume_empty(Ch, QName) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})).
+
+subscribe(Ch, Queue, NoAck) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+receive_basic_deliver(Redelivered) ->
+ receive
+ {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 02afba97c5..e9f49039ed 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -93,26 +93,7 @@ all_tests() ->
restart_queue,
restart_all_types,
stop_start_rabbit_app,
- publish,
publish_and_restart,
- consume,
- consume_first_empty,
- consume_from_empty_queue,
- consume_and_autoack,
- subscribe,
- subscribe_with_autoack,
- consume_and_ack,
- consume_and_multiple_ack,
- subscribe_and_ack,
- subscribe_and_multiple_ack,
- consume_and_requeue_nack,
- consume_and_requeue_multiple_nack,
- subscribe_and_requeue_nack,
- subscribe_and_requeue_multiple_nack,
- consume_and_nack,
- consume_and_multiple_nack,
- subscribe_and_nack,
- subscribe_and_multiple_nack,
subscribe_should_fail_when_global_qos_true,
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
@@ -120,14 +101,10 @@ all_tests() ->
dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
- basic_cancel,
- purge,
sync_queue,
cancel_sync_queue,
- basic_recover,
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
- delete_immediately,
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
@@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) ->
amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
-publish(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- publish(Ch, QQ),
- Name = ra_name(QQ),
- wait_for_messages_ready(Servers, Name, 1),
- wait_for_messages_pending_ack(Servers, Name, 0).
-
publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -660,41 +625,6 @@ publish_and_restart(Config) ->
wait_for_messages_ready(Servers, RaName, 2),
wait_for_messages_pending_ack(Servers, RaName, 0).
-consume(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_first_empty(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- consume_empty(Ch, QQ, false),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- rabbit_ct_client_helpers:close_channel(Ch).
-
consume_in_minority(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -711,63 +641,6 @@ consume_in_minority(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false})).
-consume_and_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, true),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_from_empty_queue(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- consume_empty(Ch, QQ, false).
-
-subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- qos(Ch, 10, false),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- %% validate we can retrieve the consumers
- [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
- ct:pal("Consumer ~p", [Consumer]),
- ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
- ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
- ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
- ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
- ?assertEqual([], proplists:get_value(arguments, Consumer)),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) ->
end,
ok.
-subscribe_with_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, true),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-consume_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- receive_basic_deliver(true),
- receive_basic_deliver(true),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
dead_letter_to_classic_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1500,51 +1041,6 @@ delete_declare(Config) ->
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_cancel(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- [] = rpc:call(Server, ets, tab2list, [consumer_created])
- after 5000 ->
- exit(basic_deliver_timeout)
- end.
-
-purge(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages_ready(Servers, RaName, 0).
-
sync_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_recover(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-delete_immediately(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, Args)),
-
- Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
- {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
- ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
-
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
- durable = true,
- passive = true,
- auto_delete = false,
- arguments = Args})).
-
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
@@ -2046,7 +1503,7 @@ consume_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
- %% wait for requeueing
+ %% wait for requeuing
timer:sleep(500),
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 293a597d14..60402b3a7b 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -104,7 +104,7 @@ basics(Config) ->
exit(await_msg_timeout)
end,
- % process settle applied notificaiton
+ % process settle applied notification
FState5b = process_ra_event(FState5, 250),
_ = ra:stop_server(ServerId),
_ = ra:restart_server(ServerId),
@@ -395,7 +395,8 @@ cancel_checkout(Config) ->
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.
credit(Config) ->
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index c4f5690b72..437cd02e25 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -321,7 +321,7 @@ checkout_gen(Pid) ->
}).
expand(Ops) ->
- %% execute each command against a rabbit_fifo state and capture all releavant
+ %% execute each command against a rabbit_fifo state and capture all relevant
%% effects
T = #t{},
#t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops),
@@ -463,7 +463,7 @@ run_proper(Fun, Args, NumTests) ->
end}])).
run_snapshot_test(Conf, Commands) ->
- %% create every incremental permuation of the commands lists
+ %% create every incremental permutation of the commands lists
%% and run the snapshot tests against that
[begin
% ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]),
diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl
index 8d61903308..85f0a17b2a 100644
--- a/test/rabbit_ha_test_consumer.erl
+++ b/test/rabbit_ha_test_consumer.erl
@@ -64,7 +64,7 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
CancelOnFailover, MsgNum, MsgsToConsume - 1);
MsgNum >= LowestSeen ->
error_logger:info_msg(
- "consumer ~p on ~p ignoring redeliverd msg ~p~n",
+ "consumer ~p on ~p ignoring redelivered msg ~p~n",
[self(), Channel, MsgNum]),
true = Redelivered, %% ASSERTION
run(TestPid, Channel, Queue,
diff --git a/test/rabbitmqctl_shutdown_SUITE.erl b/test/rabbitmqctl_shutdown_SUITE.erl
index 0debfde2b6..b4279c6032 100644
--- a/test/rabbitmqctl_shutdown_SUITE.erl
+++ b/test/rabbitmqctl_shutdown_SUITE.erl
@@ -111,7 +111,7 @@ node_is_running(Node) ->
shutdown_ok(Node) ->
%% Start a command
{stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []),
- %% Execute command steps. Each step will ouput a binary string
+ %% Execute command steps. Each step will output a binary string
Lines = 'Elixir.Enum':to_list(Stream),
ct:pal("Command output ~p ~n", [Lines]),
[true = is_binary(Line) || Line <- Lines],
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 6071aeb5a5..0b12f54c0b 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -33,12 +33,14 @@ groups() ->
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks,
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
]},
{quorum_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
- fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
- {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2),
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
?assertEqual(1, length(FirstActiveConsumerInList)),
@@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
-
- {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+
+ {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1),
SecondActiveConsumerInList = maps:keys(maps:filter(
fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
MessagesPerConsumer2)
@@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
- wait_for_messages(1),
+ ?assertMatch({ok, _}, wait_for_messages(1)),
LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
@@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
amqp_connection:close(C),
ok.
+fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) ->
+ %% Let's ensure that although the consumer is cancelled we still keep the unacked
+ %% messages and accept acknowledgments on them.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers0)),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]],
+
+ {CTag, DTag1} = receive_deliver(),
+ {_CTag, DTag2} = receive_deliver(),
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
+
+ receive
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ ok
+ end,
+ Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers1)),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]],
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ amqp_connection:close(C),
+ ok.
+
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
{C1, Ch1} = connection_and_channel(Config),
@@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) ->
wait_for_messages(ExpectedCount, {}).
wait_for_messages(0, State) ->
- State;
-wait_for_messages(ExpectedCount, _) ->
+ {ok, State};
+wait_for_messages(ExpectedCount, State) ->
receive
{message, {MessagesPerConsumer, MessageCount}} ->
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
after 5000 ->
- throw(message_waiting_timeout)
+ {missing, ExpectedCount, State}
end.
wait_for_cancel_ok() ->
@@ -292,3 +342,12 @@ wait_for_cancel_ok() ->
after 5000 ->
throw(consumer_cancel_ok_timeout)
end.
+
+receive_deliver() ->
+ receive
+ {#'basic.deliver'{consumer_tag = CTag,
+ delivery_tag = DTag}, _} ->
+ {CTag, DTag}
+ after 5000 ->
+ exit(deliver_timeout)
+ end.
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index d2db382e30..aaded5fa99 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -338,7 +338,7 @@ log_management_during_startup1(_Config) ->
application:unset_env(lager, extra_sinks),
ok = try rabbit:start() of
ok -> exit({got_success_but_expected_failure,
- log_rotatation_parent_dirs_test})
+ log_rotation_parent_dirs_test})
catch
_:{error, {cannot_log_to_file, _, Reason2}}
when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok;
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 364bd5aabb..9518e06196 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -1384,7 +1384,7 @@ max_message_size(Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
- %% Binary is whithin the max size limit
+ %% Binary is within the max size limit
amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}),
%% The channel process is alive
assert_channel_alive(Ch),
diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl
index 584aa76760..07134f309a 100644
--- a/test/unit_log_config_SUITE.erl
+++ b/test/unit_log_config_SUITE.erl
@@ -660,7 +660,7 @@ env_var_tty(_) ->
application:set_env(rabbit, lager_log_root, "/tmp/log_base"),
application:set_env(rabbit, lager_default_file, tty),
application:set_env(rabbit, lager_upgrade_file, tty),
- %% tty can only be set explicitely
+ %% tty can only be set explicitly
os:putenv("RABBITMQ_LOGS_source", "environment"),
rabbit_lager:configure_lager(),
diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl
index 8c9c5fa366..e53abdfa05 100644
--- a/test/worker_pool_SUITE.erl
+++ b/test/worker_pool_SUITE.erl
@@ -142,7 +142,7 @@ cancel_timeout(_) ->
reuse),
timer:sleep(1000),
- receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
after 0 -> ok
end.
@@ -179,7 +179,7 @@ cancel_timeout_by_setting(_) ->
reuse),
timer:sleep(1000),
- receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld)
+ receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled)
after 0 -> ok
end,