diff options
| author | Karl Nilsson <kjnilsson@gmail.com> | 2019-02-12 17:01:13 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-12 17:01:13 +0000 |
| commit | b0dfe9352f083607d5d7346bd3962ba6c50cc03a (patch) | |
| tree | 86dd3fa9004f9c79eda929ce16ac2c0e02fff0cf /test | |
| parent | 4e4887f48b9b92c68e85df232d7db29122d6845b (diff) | |
| parent | fb97086d7bd1c309803515789eb91b2e4e254de5 (diff) | |
| download | rabbitmq-server-git-b0dfe9352f083607d5d7346bd3962ba6c50cc03a.tar.gz | |
Merge pull request #1874 from rabbitmq/queues-testing
Generic queue testing
Diffstat (limited to 'test')
| -rw-r--r-- | test/queue_parallel_SUITE.erl | 587 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 543 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 75 |
4 files changed, 656 insertions, 552 deletions
diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl new file mode 100644 index 0000000000..1290c97b23 --- /dev/null +++ b/test/queue_parallel_SUITE.erl @@ -0,0 +1,587 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(queue_parallel_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [publish, + consume, + consume_first_empty, + consume_from_empty_queue, + consume_and_autoack, + subscribe, + subscribe_with_autoack, + consume_and_ack, + consume_and_multiple_ack, + subscribe_and_ack, + subscribe_and_multiple_ack, + subscribe_and_requeue_multiple_nack, + subscribe_and_nack, + subscribe_and_requeue_nack, + subscribe_and_multiple_nack, + consume_and_requeue_nack, + consume_and_nack, + consume_and_requeue_multiple_nack, + consume_and_multiple_nack, + basic_cancel, + purge, + basic_recover, + delete_immediately_by_resource + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +publish(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume_first_empty(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + rabbit_ct_client_helpers:close_channel(Ch). + +consume_from_empty_queue(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName). + +consume_and_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, true, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 10})), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + + subscribe(Ch, QName, false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + + %% validate we can retrieve the consumers + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + [Consumer] = lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), + + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +subscribe_with_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + subscribe(Ch, QName, true), + receive_basic_deliver(false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + receive_basic_deliver(true), + receive_basic_deliver(true), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +consume_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + +consume_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + +consume_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +subscribe_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +%% TODO test with single active +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers)), + publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + after 5000 -> + exit(basic_deliver_timeout) + end. + +purge(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + +basic_recover(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +delete_immediately_by_pid_fails(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_pid_succeeds(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = QName})). + +subscribe(Ch, Queue, NoAck) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +receive_basic_deliver(Redelivered) -> + receive + {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 02afba97c5..7d78558739 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -93,26 +93,7 @@ all_tests() -> restart_queue, restart_all_types, stop_start_rabbit_app, - publish, publish_and_restart, - consume, - consume_first_empty, - consume_from_empty_queue, - consume_and_autoack, - subscribe, - subscribe_with_autoack, - consume_and_ack, - consume_and_multiple_ack, - subscribe_and_ack, - subscribe_and_multiple_ack, - consume_and_requeue_nack, - consume_and_requeue_multiple_nack, - subscribe_and_requeue_nack, - subscribe_and_requeue_multiple_nack, - consume_and_nack, - consume_and_multiple_nack, - subscribe_and_nack, - subscribe_and_multiple_nack, subscribe_should_fail_when_global_qos_true, dead_letter_to_classic_queue, dead_letter_to_quorum_queue, @@ -120,14 +101,10 @@ all_tests() -> dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, - basic_cancel, - purge, sync_queue, cancel_sync_queue, - basic_recover, idempotent_recover, vhost_with_quorum_queue_is_deleted, - delete_immediately, delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, @@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) -> amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). -publish(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - publish(Ch, QQ), - Name = ra_name(QQ), - wait_for_messages_ready(Servers, Name, 1), - wait_for_messages_pending_ack(Servers, Name, 0). - publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), @@ -660,41 +625,6 @@ publish_and_restart(Config) -> wait_for_messages_ready(Servers, RaName, 2), wait_for_messages_pending_ack(Servers, RaName, 0). -consume(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_first_empty(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - consume_empty(Ch, QQ, false), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - rabbit_ct_client_helpers:close_channel(Ch). - consume_in_minority(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -711,63 +641,6 @@ consume_in_minority(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})). -consume_and_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, true), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_from_empty_queue(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - consume_empty(Ch, QQ, false). - -subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - qos(Ch, 10, false), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - %% validate we can retrieve the consumers - [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - ct:pal("Consumer ~p", [Consumer]), - ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), - ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), - ?assertEqual(true, proplists:get_value(ack_required, Consumer)), - ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), - ?assertEqual([], proplists:get_value(arguments, Consumer)), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) -> end, ok. -subscribe_with_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, true), - receive_basic_deliver(false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -consume_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - receive_basic_deliver(true), - receive_basic_deliver(true), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - dead_letter_to_classic_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1500,51 +1041,6 @@ delete_declare(Config) -> wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_cancel(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - [] = rpc:call(Server, ets, tab2list, [consumer_created]) - after 5000 -> - exit(basic_deliver_timeout) - end. - -purge(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - _DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), - wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages_ready(Servers, RaName, 0). - sync_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_recover(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -delete_immediately(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, Args)), - - Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], - {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), - ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - amqp_channel:call(Ch, #'queue.declare'{queue = QQ, - durable = true, - passive = true, - auto_delete = false, - arguments = Args})). - delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 293a597d14..ab1f44ab27 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -395,7 +395,8 @@ cancel_checkout(Config) -> {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 6071aeb5a5..0b12f54c0b 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -33,12 +33,14 @@ groups() -> all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks, amqp_exclusive_consume_fails_on_exclusive_consumer_queue ]}, {quorum_queue, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, - fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], - {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2), FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), ?assertEqual(1, length(FirstActiveConsumerInList)), @@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], - - {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + + {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1), SecondActiveConsumerInList = maps:keys(maps:filter( fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, MessagesPerConsumer2) @@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), - wait_for_messages(1), + ?assertMatch({ok, _}, wait_for_messages(1)), LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), @@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> amqp_connection:close(C), ok. +fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) -> + %% Let's ensure that although the consumer is cancelled we still keep the unacked + %% messages and accept acknowledgments on them. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers0)), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]], + + {CTag, DTag1} = receive_deliver(), + {_CTag, DTag2} = receive_deliver(), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + + receive + #'basic.cancel_ok'{consumer_tag = CTag} -> + ok + end, + Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers1)), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + amqp_connection:close(C), + ok. + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), {C1, Ch1} = connection_and_channel(Config), @@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) -> wait_for_messages(ExpectedCount, {}). wait_for_messages(0, State) -> - State; -wait_for_messages(ExpectedCount, _) -> + {ok, State}; +wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) after 5000 -> - throw(message_waiting_timeout) + {missing, ExpectedCount, State} end. wait_for_cancel_ok() -> @@ -292,3 +342,12 @@ wait_for_cancel_ok() -> after 5000 -> throw(consumer_cancel_ok_timeout) end. + +receive_deliver() -> + receive + {#'basic.deliver'{consumer_tag = CTag, + delivery_tag = DTag}, _} -> + {CTag, DTag} + after 5000 -> + exit(deliver_timeout) + end. |
