summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2019-02-12 17:01:13 +0000
committerGitHub <noreply@github.com>2019-02-12 17:01:13 +0000
commitb0dfe9352f083607d5d7346bd3962ba6c50cc03a (patch)
tree86dd3fa9004f9c79eda929ce16ac2c0e02fff0cf /test
parent4e4887f48b9b92c68e85df232d7db29122d6845b (diff)
parentfb97086d7bd1c309803515789eb91b2e4e254de5 (diff)
downloadrabbitmq-server-git-b0dfe9352f083607d5d7346bd3962ba6c50cc03a.tar.gz
Merge pull request #1874 from rabbitmq/queues-testing
Generic queue testing
Diffstat (limited to 'test')
-rw-r--r--test/queue_parallel_SUITE.erl587
-rw-r--r--test/quorum_queue_SUITE.erl543
-rw-r--r--test/rabbit_fifo_SUITE.erl3
-rw-r--r--test/single_active_consumer_SUITE.erl75
4 files changed, 656 insertions, 552 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl
new file mode 100644
index 0000000000..1290c97b23
--- /dev/null
+++ b/test/queue_parallel_SUITE.erl
@@ -0,0 +1,587 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+-module(queue_parallel_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-define(TIMEOUT, 30000).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ AllTests = [publish,
+ consume,
+ consume_first_empty,
+ consume_from_empty_queue,
+ consume_and_autoack,
+ subscribe,
+ subscribe_with_autoack,
+ consume_and_ack,
+ consume_and_multiple_ack,
+ subscribe_and_ack,
+ subscribe_and_multiple_ack,
+ subscribe_and_requeue_multiple_nack,
+ subscribe_and_nack,
+ subscribe_and_requeue_nack,
+ subscribe_and_multiple_nack,
+ consume_and_requeue_nack,
+ consume_and_nack,
+ consume_and_requeue_multiple_nack,
+ consume_and_multiple_nack,
+ basic_cancel,
+ purge,
+ basic_recover,
+ delete_immediately_by_resource
+ ],
+ [
+ {parallel_tests, [],
+ [
+ {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]},
+ {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, true}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q},
+ {queue_name_2, Q2}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+publish(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+consume_first_empty(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, [<<"msg1">>]),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+consume_from_empty_queue(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ consume_empty(Ch, QName).
+
+consume_and_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, QName, true, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = false,
+ prefetch_count = 10})),
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+
+ %% validate we can retrieve the consumers
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ [Consumer] = lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers),
+ ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
+ ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
+ ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
+ ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
+ ?assertEqual([], proplists:get_value(arguments, Consumer)),
+
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+subscribe_with_autoack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ subscribe(Ch, QName, true),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_ack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ receive_basic_deliver(true),
+ receive_basic_deliver(true),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = true}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+consume_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]).
+
+consume_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+consume_and_requeue_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]).
+
+consume_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]).
+
+subscribe_and_requeue_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end
+ end.
+
+subscribe_and_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+subscribe_and_multiple_nack(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]])
+ end.
+
+%% TODO test with single active
+basic_cancel(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ subscribe(Ch, QName, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ ?assertEqual([], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ QName == Resource#resource.name
+ end, Consumers)),
+ publish(Ch, QName, [<<"msg2">>, <<"msg3">>]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]])
+ after 5000 ->
+ exit(basic_deliver_timeout)
+ end.
+
+purge(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>, <<"msg2">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]),
+ {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]).
+
+basic_recover(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ publish(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [<<"msg1">>]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]).
+
+delete_immediately_by_pid_fails(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_pid_succeeds(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ declare_queue(Ch, Config, QName),
+
+ Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ durable = Durable,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+declare_queue(Ch, Config, QName) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ arguments = Args,
+ durable = Durable}).
+
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ consume(Ch, QName, false, Payloads).
+
+consume(Ch, QName, NoAck, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName,
+ no_ack = NoAck}),
+ DTag
+ end || Payload <- Payloads].
+
+consume_empty(Ch, QName) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = QName})).
+
+subscribe(Ch, Queue, NoAck) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+receive_basic_deliver(Redelivered) ->
+ receive
+ {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
+ ok
+ end.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 02afba97c5..7d78558739 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -93,26 +93,7 @@ all_tests() ->
restart_queue,
restart_all_types,
stop_start_rabbit_app,
- publish,
publish_and_restart,
- consume,
- consume_first_empty,
- consume_from_empty_queue,
- consume_and_autoack,
- subscribe,
- subscribe_with_autoack,
- consume_and_ack,
- consume_and_multiple_ack,
- subscribe_and_ack,
- subscribe_and_multiple_ack,
- consume_and_requeue_nack,
- consume_and_requeue_multiple_nack,
- subscribe_and_requeue_nack,
- subscribe_and_requeue_multiple_nack,
- consume_and_nack,
- consume_and_multiple_nack,
- subscribe_and_nack,
- subscribe_and_multiple_nack,
subscribe_should_fail_when_global_qos_true,
dead_letter_to_classic_queue,
dead_letter_to_quorum_queue,
@@ -120,14 +101,10 @@ all_tests() ->
dead_letter_policy,
cleanup_queue_state_on_channel_after_publish,
cleanup_queue_state_on_channel_after_subscribe,
- basic_cancel,
- purge,
sync_queue,
cancel_sync_queue,
- basic_recover,
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
- delete_immediately,
delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count,
@@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) ->
amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
-publish(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- publish(Ch, QQ),
- Name = ra_name(QQ),
- wait_for_messages_ready(Servers, Name, 1),
- wait_for_messages_pending_ack(Servers, Name, 0).
-
publish_confirm(Ch, QName) ->
publish(Ch, QName),
amqp_channel:register_confirm_handler(Ch, self()),
@@ -660,41 +625,6 @@ publish_and_restart(Config) ->
wait_for_messages_ready(Servers, RaName, 2),
wait_for_messages_pending_ack(Servers, RaName, 0).
-consume(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_first_empty(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- consume_empty(Ch, QQ, false),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, false),
- rabbit_ct_client_helpers:close_channel(Ch).
-
consume_in_minority(Config) ->
[Server0, Server1, Server2] =
rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -711,63 +641,6 @@ consume_in_minority(Config) ->
amqp_channel:call(Ch, #'basic.get'{queue = QQ,
no_ack = false})).
-consume_and_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- consume(Ch, QQ, true),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_from_empty_queue(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- consume_empty(Ch, QQ, false).
-
-subscribe(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- qos(Ch, 10, false),
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- %% validate we can retrieve the consumers
- [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
- ct:pal("Consumer ~p", [Consumer]),
- ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
- ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
- ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
- ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
- ?assertEqual([], proplists:get_value(arguments, Consumer)),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
subscribe_should_fail_when_global_qos_true(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) ->
end,
ok.
-subscribe_with_autoack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, true),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- rabbit_ct_client_helpers:close_channel(Ch),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_ack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-consume_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-consume_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- _ = consume(Ch, QQ, false),
- DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-subscribe_and_requeue_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = true}),
- receive_basic_deliver(true),
- receive_basic_deliver(true),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
- multiple = true}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_requeue_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = true}),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag1,
- redelivered = true}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end
- end.
-
-subscribe_and_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = false,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
-subscribe_and_multiple_nack(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 3),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive_basic_deliver(false),
- receive_basic_deliver(false),
- receive
- {#'basic.deliver'{delivery_tag = DeliveryTag,
- redelivered = false}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 3),
- amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
- multiple = true,
- requeue = false}),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 0)
- end.
-
dead_letter_to_classic_queue(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1500,51 +1041,6 @@ delete_declare(Config) ->
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_cancel(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- subscribe(Ch, QQ, false),
- receive
- {#'basic.deliver'{}, _} ->
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- [] = rpc:call(Server, ets, tab2list, [consumer_created])
- after 5000 ->
- exit(basic_deliver_timeout)
- end.
-
-purge(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 2),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _DeliveryTag = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- wait_for_messages_ready(Servers, RaName, 0).
-
sync_queue(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
-basic_recover(Config) ->
- [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
- RaName = ra_name(QQ),
- publish(Ch, QQ),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0),
- _ = consume(Ch, QQ, false),
- wait_for_messages_ready(Servers, RaName, 0),
- wait_for_messages_pending_ack(Servers, RaName, 1),
- amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
- wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0).
-
-delete_immediately(Config) ->
- Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
-
- Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
- QQ = ?config(queue_name, Config),
- Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- declare(Ch, QQ, Args)),
-
- Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
- {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
- ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
-
- ?assertEqual({'queue.declare_ok', QQ, 0, 0},
- amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
- durable = true,
- passive = true,
- auto_delete = false,
- arguments = Args})).
-
delete_immediately_by_resource(Config) ->
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 293a597d14..ab1f44ab27 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -395,7 +395,8 @@ cancel_checkout(Config) ->
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
- {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
+ {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
ok.
credit(Config) ->
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 6071aeb5a5..0b12f54c0b 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -33,12 +33,14 @@ groups() ->
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks,
amqp_exclusive_consume_fails_on_exclusive_consumer_queue
]},
{quorum_queue, [], [
all_messages_go_to_one_consumer,
fallback_to_another_consumer_when_first_one_is_cancelled,
- fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled
+ fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled,
+ fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks
%% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ
]}
].
@@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)],
- {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2),
+ {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2),
FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)),
?assertEqual(1, length(FirstActiveConsumerInList)),
@@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
{cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(),
[amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)],
-
- {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1),
+
+ {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1),
SecondActiveConsumerInList = maps:keys(maps:filter(
fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end,
MessagesPerConsumer2)
@@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
#'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}),
amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}),
- wait_for_messages(1),
+ ?assertMatch({ok, _}, wait_for_messages(1)),
LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))),
@@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) ->
amqp_connection:close(C),
ok.
+fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) ->
+ %% Let's ensure that although the consumer is cancelled we still keep the unacked
+ %% messages and accept acknowledgments on them.
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ {C, Ch} = connection_and_channel(Config),
+ Q = queue_declare(Ch, Config),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ #'basic.consume_ok'{} =
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()),
+ Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers0)),
+
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Q},
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]],
+
+ {CTag, DTag1} = receive_deliver(),
+ {_CTag, DTag2} = receive_deliver(),
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+ #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}),
+
+ receive
+ #'basic.cancel_ok'{consumer_tag = CTag} ->
+ ok
+ end,
+ Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ?assertMatch([_, _], lists:filter(fun(Props) ->
+ Resource = proplists:get_value(queue_name, Props),
+ Q == Resource#resource.name
+ end, Consumers1)),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]],
+
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}),
+ quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]),
+
+ amqp_connection:close(C),
+ ok.
+
fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) ->
{C, Ch} = connection_and_channel(Config),
{C1, Ch1} = connection_and_channel(Config),
@@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) ->
wait_for_messages(ExpectedCount, {}).
wait_for_messages(0, State) ->
- State;
-wait_for_messages(ExpectedCount, _) ->
+ {ok, State};
+wait_for_messages(ExpectedCount, State) ->
receive
{message, {MessagesPerConsumer, MessageCount}} ->
wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount})
after 5000 ->
- throw(message_waiting_timeout)
+ {missing, ExpectedCount, State}
end.
wait_for_cancel_ok() ->
@@ -292,3 +342,12 @@ wait_for_cancel_ok() ->
after 5000 ->
throw(consumer_cancel_ok_timeout)
end.
+
+receive_deliver() ->
+ receive
+ {#'basic.deliver'{consumer_tag = CTag,
+ delivery_tag = DTag}, _} ->
+ {CTag, DTag}
+ after 5000 ->
+ exit(deliver_timeout)
+ end.