diff options
| author | Michael Klishin <michael@novemberain.com> | 2019-02-05 17:45:51 +0300 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2019-02-05 17:45:51 +0300 |
| commit | 32e5d3531909188b6ba64ae9f2dd2c88bc4fdb86 (patch) | |
| tree | 1b5b52176a97f877f4e9d64fdb7482c0f4057f43 | |
| parent | 4707e091f399d556ffc8467efd01303aa93c2598 (diff) | |
| parent | a938f418dc327b6b43bcb53180d4f31ce0b48862 (diff) | |
| download | rabbitmq-server-git-32e5d3531909188b6ba64ae9f2dd2c88bc4fdb86.tar.gz | |
Merge pull request #1867 from rabbitmq/publisher-confirms-testing
Publisher confirms test suite for all queue types
| -rw-r--r-- | test/publisher_confirms_parallel_SUITE.erl | 381 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 37 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 66 |
3 files changed, 381 insertions, 103 deletions
diff --git a/test/publisher_confirms_parallel_SUITE.erl b/test/publisher_confirms_parallel_SUITE.erl new file mode 100644 index 0000000000..45484dd92b --- /dev/null +++ b/test/publisher_confirms_parallel_SUITE.erl @@ -0,0 +1,381 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +%% For the full spec see: http://www.rabbitmq.com/confirms.html +%% +-module(publisher_confirms_parallel_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 60000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, publisher_confirm_tests} + ]. + +groups() -> + PublisherConfirmTests = [publisher_confirms, + publisher_confirms_with_deleted_queue, + confirm_select_ok, + confirm_nowait, + confirm_ack, + confirm_acks, + confirm_mandatory_unroutable, + confirm_unroutable_message], + [ + {publisher_confirm_tests, [], + [ + {classic_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]}, + {mirrored_queue, [parallel], PublisherConfirmTests ++ [confirm_nack]}, + {quorum_queue, [parallel], PublisherConfirmTests ++ [confirm_minority]} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%% To enable confirms, a client sends the confirm.select method +publisher_confirms(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:unregister_confirm_handler(Ch), + ok. + +publisher_confirms_with_deleted_queue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>]), + amqp_channel:call(Ch, #'queue.delete'{queue = QName}), + amqp_channel:wait_for_confirms_or_die(Ch, 5000), + amqp_channel:unregister_confirm_handler(Ch). + +%% Depending on whether no-wait was set or not, the broker may respond with a confirm.select-ok +confirm_select_ok(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + ?assertEqual(#'confirm.select_ok'{}, amqp_channel:call(Ch, #'confirm.select'{nowait = false})). + +confirm_nowait(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + ?assertEqual(ok, amqp_channel:call(Ch, #'confirm.select'{nowait = true})). + +%% The broker then confirms messages as it handles them by sending a basic.ack on the same channel. +%% The delivery-tag field contains the sequence number of the confirmed message. +confirm_ack(Config) -> + %% Ensure we receive an ack and not a nack + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>]), + receive + #'basic.ack'{delivery_tag = 1} -> + ok + after 5000 -> + throw(missing_ack) + end. + +%% The broker may also set the multiple field in basic.ack to indicate that all messages up to +%% and including the one with the sequence number have been handled. +confirm_acks(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>, <<"msg4">>]), + receive_many(lists:seq(1, 4)). + +%% For unroutable messages, the broker will issue a confirm once the exchange verifies a message +%% won't route to any queue (returns an empty list of queues). +%% If the message is also published as mandatory, the basic.return is sent to the client before +%% basic.ack. +confirm_mandatory_unroutable(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + amqp_channel:register_return_handler(Ch, self()), + ok = amqp_channel:call(Ch, #'basic.publish'{routing_key = QName, + mandatory = true}, #amqp_msg{payload = <<"msg1">>}), + receive + {#'basic.return'{}, _} -> + ok + after 5000 -> + throw(missing_return) + end, + receive + #'basic.ack'{delivery_tag = 1} -> + ok + after 5000 -> + throw(missing_ack) + end. + +confirm_unroutable_message(Config) -> + %% Ensure we receive a nack for an unroutable message + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>]), + receive + {#'basic.return'{}, _} -> + throw(unexpected_basic_return); + #'basic.ack'{delivery_tag = 1} -> + ok + after 5000 -> + throw(missing_ack) + end. + +%% In exceptional cases when the broker is unable to handle messages successfully, +%% instead of a basic.ack, the broker will send a basic.nack. +%% basic.nack will only be delivered if an internal error occurs in the Erlang process +%% responsible for a queue. +%% This test crashes the queue before it has time to answer, but it only works for classic +%% queues. On quorum queues the followers will take over and rabbit_fifo_client will resend +%% any pending messages. +confirm_nack(Config) -> + passed = rabbit_ct_broker_helpers:rpc(Config, 0, + ?MODULE, confirm_nack1, [Config]). + +confirm_nack1(Config) -> + {_Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(), + ok = rabbit_channel:do(Ch, #'channel.open'{}), + receive #'channel.open_ok'{} -> ok + after ?TIMEOUT -> throw(failed_to_receive_channel_open_ok) + end, + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName1 = ?config(queue_name, Config), + QName2 = ?config(queue_name_2, Config), + DeclareBindDurableQueue = + fun(QName) -> + rabbit_channel:do(Ch, #'queue.declare'{durable = Durable, + queue = QName, + arguments = Args}), + receive #'queue.declare_ok'{} -> + rabbit_channel:do(Ch, #'queue.bind'{ + queue = QName, + exchange = <<"amq.direct">>, + routing_key = "confirms-magic" }), + receive #'queue.bind_ok'{} -> ok + after ?TIMEOUT -> throw(failed_to_bind_queue) + end + after ?TIMEOUT -> throw(failed_to_declare_queue) + end + end, + %% Declare and bind two queues + DeclareBindDurableQueue(QName1), + DeclareBindDurableQueue(QName2), + %% Get the first one's pid (we'll crash it later) + {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), + QPid1 = amqqueue:get_pid(Q1), + %% Enable confirms + rabbit_channel:do(Ch, #'confirm.select'{}), + receive + #'confirm.select_ok'{} -> ok + after ?TIMEOUT -> throw(failed_to_enable_confirms) + end, + %% Publish a message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, + routing_key = "confirms-magic" + }, + rabbit_basic:build_content( + #'P_basic'{delivery_mode = 2}, <<"">>)), + %% We must not kill the queue before the channel has processed the + %% 'publish'. + ok = rabbit_channel:flush(Ch), + %% Crash the queue + QPid1 ! boom, + %% Wait for a nack + receive + #'basic.nack'{} -> ok; + #'basic.ack'{} -> throw(received_ack_instead_of_nack) + after ?TIMEOUT-> throw(did_not_receive_nack) + end, + receive + #'basic.ack'{} -> throw(received_ack_when_none_expected) + after 1000 -> ok + end, + %% Cleanup + unlink(Ch), + ok = rabbit_channel:shutdown(Ch), + passed. + +%% The closest to a nack behaviour that we can get on quorum queues is not answering while +%% the cluster is in minority. Once the cluster recovers, a 'basic.ack' will be issued. +confirm_minority(Config) -> + [_A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + ok = rabbit_ct_broker_helpers:stop_node(Config, B), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QName, [<<"msg1">>]), + receive + #'basic.nack'{} -> throw(unexpected_nack); + #'basic.ack'{} -> throw(unexpected_ack) + after 30000 -> + ok + end, + ok = rabbit_ct_broker_helpers:start_node(Config, B), + receive + #'basic.nack'{} -> throw(unexpected_nack); + #'basic.ack'{} -> ok + after 60000 -> + throw(missing_ack) + end. + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +publish(Ch, QName, Payloads, Headers) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers}}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +sync_mirrors(QName, Config) -> + case ?config(is_mirrored, Config) of + true -> + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]); + _ -> ok + end. + +receive_many([]) -> + ok; +receive_many(DTags) -> + receive + #'basic.ack'{delivery_tag = DTag, multiple = true} -> + receive_many(DTags -- lists:seq(1, DTag)); + #'basic.ack'{delivery_tag = DTag, multiple = false} -> + receive_many(DTags -- [DTag]) + after 5000 -> + throw(missing_ack) + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index fd58756fda..02afba97c5 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -114,8 +114,6 @@ all_tests() -> subscribe_and_nack, subscribe_and_multiple_nack, subscribe_should_fail_when_global_qos_true, - publisher_confirms, - publisher_confirms_with_deleted_queue, dead_letter_to_classic_queue, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, @@ -1123,41 +1121,6 @@ subscribe_and_multiple_nack(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0) end. -publisher_confirms(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:register_confirm_handler(Ch, self()), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - ct:pal("WAIT FOR CONFIRMS ~n", []), - amqp_channel:wait_for_confirms(Ch, 5000), - amqp_channel:unregister_confirm_handler(Ch), - ok. - -publisher_confirms_with_deleted_queue(Config) -> - [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - amqp_channel:call(Ch, #'confirm.select'{}), - amqp_channel:register_confirm_handler(Ch, self()), - % subscribe(Ch, QQ, false), - publish(Ch, QQ), - delete_queues(Ch, [QQ]), - ct:pal("WAIT FOR CONFIRMS ~n", []), - amqp_channel:wait_for_confirms_or_die(Ch, 5000), - amqp_channel:unregister_confirm_handler(Ch), - ok. - dead_letter_to_classic_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index f4f4971517..364bd5aabb 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -48,7 +48,6 @@ groups() -> {parallel_tests, [parallel], [ amqp_connection_refusal, configurable_server_properties, - confirms, credit_flow_settings, dynamic_mirroring, gen_server2_with_state, @@ -931,71 +930,6 @@ test_topic_expect_match(X, List) -> %% --------------------------------------------------------------------------- %% Unordered tests (originally from rabbit_tests.erl). %% --------------------------------------------------------------------------- - -confirms(Config) -> - passed = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, confirms1, [Config]). - -confirms1(_Config) -> - {_Writer, Ch} = test_spawn(), - DeclareBindDurableQueue = - fun() -> - rabbit_channel:do(Ch, #'queue.declare'{durable = true}), - receive #'queue.declare_ok'{queue = Q0} -> - rabbit_channel:do(Ch, #'queue.bind'{ - queue = Q0, - exchange = <<"amq.direct">>, - routing_key = "confirms-magic" }), - receive #'queue.bind_ok'{} -> Q0 - after ?TIMEOUT -> throw(failed_to_bind_queue) - end - after ?TIMEOUT -> throw(failed_to_declare_queue) - end - end, - %% Declare and bind two queues - QName1 = DeclareBindDurableQueue(), - QName2 = DeclareBindDurableQueue(), - %% Get the first one's pid (we'll crash it later) - {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = amqqueue:get_pid(Q1), - %% Enable confirms - rabbit_channel:do(Ch, #'confirm.select'{}), - receive - #'confirm.select_ok'{} -> ok - after ?TIMEOUT -> throw(failed_to_enable_confirms) - end, - %% Publish a message - rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"amq.direct">>, - routing_key = "confirms-magic" - }, - rabbit_basic:build_content( - #'P_basic'{delivery_mode = 2}, <<"">>)), - %% We must not kill the queue before the channel has processed the - %% 'publish'. - ok = rabbit_channel:flush(Ch), - %% Crash the queue - QPid1 ! boom, - %% Wait for a nack - receive - #'basic.nack'{} -> ok; - #'basic.ack'{} -> throw(received_ack_instead_of_nack) - after ?TIMEOUT-> throw(did_not_receive_nack) - end, - receive - #'basic.ack'{} -> throw(received_ack_when_none_expected) - after 1000 -> ok - end, - %% Cleanup - rabbit_channel:do(Ch, #'queue.delete'{queue = QName2}), - receive - #'queue.delete_ok'{} -> ok - after ?TIMEOUT -> throw(failed_to_cleanup_queue) - end, - unlink(Ch), - ok = rabbit_channel:shutdown(Ch), - - passed. - gen_server2_with_state(Config) -> passed = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, gen_server2_with_state1, [Config]). |
