diff options
| author | Matteo Cafasso <noxdafox@gmail.com> | 2018-11-24 17:20:23 +0200 |
|---|---|---|
| committer | Matteo Cafasso <noxdafox@gmail.com> | 2018-11-24 17:20:23 +0200 |
| commit | 11002abb8cbf71cea43519fde9efed7cbb87c4ef (patch) | |
| tree | 6429a72c9e26bcf84a3dc4f493a3251794ee6059 /test | |
| parent | eb1b636645bf5a02361ade0821c7c96e77ecf966 (diff) | |
| parent | 0c7c7d960c8ca54b5b29c06d295acef0bc9f3c7a (diff) | |
| download | rabbitmq-server-git-11002abb8cbf71cea43519fde9efed7cbb87c4ef.tar.gz | |
Merge branch 'master' of github.com:rabbitmq/rabbitmq-server
Diffstat (limited to 'test')
| -rw-r--r-- | test/backing_queue_SUITE.erl | 7 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 44 | ||||
| -rw-r--r-- | test/config_schema_SUITE_data/rabbit.snippets | 39 | ||||
| -rw-r--r-- | test/list_consumers_sanity_check_SUITE.erl | 10 | ||||
| -rw-r--r-- | test/list_queues_online_and_offline_SUITE.erl | 6 | ||||
| -rw-r--r-- | test/policy_SUITE.erl | 3 | ||||
| -rw-r--r-- | test/proxy_protocol_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 1812 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 624 | ||||
| -rw-r--r-- | test/rabbitmqctl_integration_SUITE.erl | 4 |
10 files changed, 2494 insertions, 57 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 60f86e0542..94cbd48e8c 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -701,7 +701,9 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) -> CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} = - rabbit_amqqueue:basic_get(Q, self(), true, Limiter), + rabbit_amqqueue:basic_get(Q, self(), true, Limiter, + <<"bq_variable_queue_delete_msg_store_files_callback1">>, + #{}), {ok, CountMinusOne} = rabbit_amqqueue:purge(Q), %% give the queue a second to receive the close_fds callback msg @@ -737,7 +739,8 @@ bq_queue_recover1(Config) -> fun (Q1 = #amqqueue { pid = QPid1 }) -> CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = - rabbit_amqqueue:basic_get(Q1, self(), false, Limiter), + rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, + <<"bq_queue_recover1">>, #{}), exit(QPid1, shutdown), VQ1 = variable_queue_init(Q, true), {{_Msg1, true, _AckTag1}, VQ2} = diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index 4864989b6a..62928aae9f 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -29,8 +29,7 @@ delegates_async, delegates_sync, queue_cleanup, - declare_on_dead_queue, - refresh_events + declare_on_dead_queue ]). all() -> @@ -240,34 +239,6 @@ declare_on_dead_queue1(_Config, SecondaryNode) -> after ?TIMEOUT -> throw(failed_to_create_and_kill_queue) end. -refresh_events(Config) -> - {I, J} = ?config(test_direction, Config), - From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename), - To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename), - rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE), - passed = rabbit_ct_broker_helpers:rpc(Config, - From, ?MODULE, refresh_events1, [Config, To]). - -refresh_events1(Config, SecondaryNode) -> - dummy_event_receiver:start(self(), [node(), SecondaryNode], - [channel_created, queue_created]), - - {_Writer, Ch} = test_spawn(), - expect_events(pid, Ch, channel_created), - rabbit_channel:shutdown(Ch), - - {_Writer2, Ch2} = test_spawn(SecondaryNode), - expect_events(pid, Ch2, channel_created), - rabbit_channel:shutdown(Ch2), - - {new, #amqqueue{name = QName} = Q} = - rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>), - false, false, [], none, <<"acting-user">>), - expect_events(name, QName, queue_created), - rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>), - - dummy_event_receiver:stop(), - passed. make_responder(FMsg) -> make_responder(FMsg, timeout). make_responder(FMsg, Throw) -> @@ -307,19 +278,6 @@ dead_queue_loop(QueueName, OldPid) -> Q end. -expect_events(Tag, Key, Type) -> - expect_event(Tag, Key, Type), - rabbit:force_event_refresh(make_ref()), - expect_event(Tag, Key, Type). - -expect_event(Tag, Key, Type) -> - receive #event{type = Type, props = Props} -> - case rabbit_misc:pget(Tag, Props) of - Key -> ok; - _ -> expect_event(Tag, Key, Type) - end - after ?TIMEOUT -> throw({failed_to_receive_event, Type}) - end. test_spawn() -> {Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(), diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 685b05b3dc..625fcd93a9 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -302,6 +302,45 @@ tcp_listen_options.exit_on_close = false", {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, {versions,['tlsv1.2','tlsv1.1']}]}]}], []}, + + {ssl_options_ciphers, + "listeners.ssl.1 = 5671 + ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem + ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem + ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem + ssl_options.versions.1 = tlsv1.2 + ssl_options.versions.2 = tlsv1.1 + ssl_options.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384 + ssl_options.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384 + ssl_options.ciphers.3 = ECDHE-ECDSA-AES256-SHA384 + ssl_options.ciphers.4 = ECDHE-RSA-AES256-SHA384 + ssl_options.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384 + ssl_options.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384 + ssl_options.ciphers.7 = ECDH-ECDSA-AES256-SHA384 + ssl_options.ciphers.8 = ECDH-RSA-AES256-SHA384 + ssl_options.ciphers.9 = DHE-RSA-AES256-GCM-SHA384", + [{ssl,[{versions,['tlsv1.2','tlsv1.1']}]}], + [{ssl,[{versions,['tlsv1.2','tlsv1.1']}]}, + {rabbit, + [{ssl_listeners,[5671]}, + {ssl_options, + [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"}, + {ciphers, [ + "DHE-RSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-GCM-SHA384", + "ECDH-ECDSA-AES256-SHA384", + "ECDH-RSA-AES256-GCM-SHA384", + "ECDH-RSA-AES256-SHA384", + "ECDHE-ECDSA-AES256-GCM-SHA384", + "ECDHE-ECDSA-AES256-SHA384", + "ECDHE-RSA-AES256-GCM-SHA384", + "ECDHE-RSA-AES256-SHA384" + ]}, + {certfile,"test/config_schema_SUITE_data/certs/cert.pem"}, + {keyfile,"test/config_schema_SUITE_data/certs/key.pem"}, + {versions,['tlsv1.2','tlsv1.1']}]}]}], + []}, + {ssl_options_allow_poodle, "listeners.ssl.1 = 5671 ssl_allow_poodle_attack = true diff --git a/test/list_consumers_sanity_check_SUITE.erl b/test/list_consumers_sanity_check_SUITE.erl index 8f2fb8e57f..3fa02b1de6 100644 --- a/test/list_consumers_sanity_check_SUITE.erl +++ b/test/list_consumers_sanity_check_SUITE.erl @@ -91,7 +91,7 @@ list_consumers_sanity_check(Config) -> %% `rabbitmqctl report` shares some code with `list_consumers`, so %% check that it also reports both channels {ok, ReportStdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A, - ["list_consumers"]), + ["list_consumers", "--no-table-headers"]), ReportLines = re:split(ReportStdOut, <<"\n">>, [trim]), ReportCTags = [lists:nth(3, re:split(Row, <<"\t">>)) || <<"list_consumers_q", _/binary>> = Row <- ReportLines], true = (lists:sort([CTag1, CTag2]) =:= @@ -99,7 +99,7 @@ list_consumers_sanity_check(Config) -> rabbitmqctl_list_consumers(Config, Node) -> {ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Node, - ["list_consumers"]), + ["list_consumers", "--no-table-headers"]), [<<"Listing consumers", _/binary>> | ConsumerRows] = re:split(StdOut, <<"\n">>, [trim]), CTags = [ lists:nth(3, re:split(Row, <<"\t">>)) || Row <- ConsumerRows ], CTags. @@ -117,17 +117,17 @@ list_queues_online_and_offline(Config) -> rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["stop"]), GotUp = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "--online", "name"])), + ["list_queues", "--online", "name", "--no-table-headers"])), ExpectUp = [[<<"q_a_1">>], [<<"q_a_2">>]], ExpectUp = GotUp, GotDown = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "--offline", "name"])), + ["list_queues", "--offline", "name", "--no-table-headers"])), ExpectDown = [[<<"q_b_1">>], [<<"q_b_2">>]], ExpectDown = GotDown, GotAll = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "name"])), + ["list_queues", "name", "--no-table-headers"])), ExpectAll = ExpectUp ++ ExpectDown, ExpectAll = GotAll, diff --git a/test/list_queues_online_and_offline_SUITE.erl b/test/list_queues_online_and_offline_SUITE.erl index 96dc988006..4b56012a26 100644 --- a/test/list_queues_online_and_offline_SUITE.erl +++ b/test/list_queues_online_and_offline_SUITE.erl @@ -86,17 +86,17 @@ list_queues_online_and_offline(Config) -> rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["stop"]), GotUp = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "--online", "name"])), + ["list_queues", "--online", "name", "--no-table-headers"])), ExpectUp = [[<<"q_a_1">>], [<<"q_a_2">>]], ExpectUp = GotUp, GotDown = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "--offline", "name"])), + ["list_queues", "--offline", "name", "--no-table-headers"])), ExpectDown = [[<<"q_b_1">>], [<<"q_b_2">>]], ExpectDown = GotDown, GotAll = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A, - ["list_queues", "name"])), + ["list_queues", "name", "--no-table-headers"])), ExpectAll = ExpectUp ++ ExpectDown, ExpectAll = GotAll, diff --git a/test/policy_SUITE.erl b/test/policy_SUITE.erl index 2c41433a30..7cf3427db0 100644 --- a/test/policy_SUITE.erl +++ b/test/policy_SUITE.erl @@ -147,7 +147,8 @@ operator_retroactive_policy_publish_ttl(Config) -> %% the queue publish(Ch, Q, lists:seq(1, 25)), timer:sleep(50), - [[<<"policy_ttl-queue">>, <<"75">>]] = rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues"]), + [[<<"policy_ttl-queue">>, <<"75">>]] = + rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "--no-table-headers"]), get_messages(50, Ch, Q), delete(Ch, Q), diff --git a/test/proxy_protocol_SUITE.erl b/test/proxy_protocol_SUITE.erl index 136d2bb980..84b94d72a8 100644 --- a/test/proxy_protocol_SUITE.erl +++ b/test/proxy_protocol_SUITE.erl @@ -97,4 +97,4 @@ connection_name() -> Pid = lists:nth(1, Pids), {dictionary, Dict} = process_info(Pid, dictionary), {process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict), - ConnectionName.
\ No newline at end of file + ConnectionName. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl new file mode 100644 index 0000000000..19f3a66a1d --- /dev/null +++ b/test/quorum_queue_SUITE.erl @@ -0,0 +1,1812 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% + +-module(quorum_queue_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). + +-compile(export_all). + +all() -> + [ + {group, single_node}, + {group, unclustered}, + {group, clustered} + ]. + +groups() -> + [ + {single_node, [], all_tests()}, + {unclustered, [], [ + {cluster_size_2, [], [add_member]} + ]}, + {clustered, [], [ + {cluster_size_2, [], [cleanup_data_dir]}, + {cluster_size_2, [], [add_member_not_running, + add_member_classic, + add_member_already_a_member, + add_member_not_found, + delete_member_not_running, + delete_member_classic, + delete_member_not_found, + delete_member] + ++ all_tests()}, + {cluster_size_3, [], [ + declare_during_node_down, + recover_from_single_failure, + recover_from_multiple_failures, + leadership_takeover, + delete_declare, + metrics_cleanup_on_leadership_takeover, + metrics_cleanup_on_leader_crash, + consume_in_minority + ]}, + {cluster_size_5, [], [start_queue, + start_queue_concurrent, + quorum_cluster_size_3, + quorum_cluster_size_7 + ]} + ]} + ]. + +all_tests() -> + [ + declare_args, + declare_invalid_args, + declare_invalid_properties, + start_queue, + stop_queue, + restart_queue, + restart_all_types, + stop_start_rabbit_app, + publish, + publish_and_restart, + consume, + consume_first_empty, + consume_from_empty_queue, + consume_and_autoack, + subscribe, + subscribe_with_autoack, + consume_and_ack, + consume_and_multiple_ack, + subscribe_and_ack, + subscribe_and_multiple_ack, + consume_and_requeue_nack, + consume_and_requeue_multiple_nack, + subscribe_and_requeue_nack, + subscribe_and_requeue_multiple_nack, + consume_and_nack, + consume_and_multiple_nack, + subscribe_and_nack, + subscribe_and_multiple_nack, + subscribe_should_fail_when_global_qos_true, + publisher_confirms, + publisher_confirms_with_deleted_queue, + dead_letter_to_classic_queue, + dead_letter_to_quorum_queue, + dead_letter_from_classic_to_quorum_queue, + cleanup_queue_state_on_channel_after_publish, + cleanup_queue_state_on_channel_after_subscribe, + basic_cancel, + purge, + sync_queue, + cancel_sync_queue, + basic_recover, + idempotent_recover, + vhost_with_quorum_queue_is_deleted + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(clustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); +init_per_group(unclustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(Group, Config) -> + ClusterSize = case Group of + single_node -> 1; + cluster_size_2 -> 2; + cluster_size_3 -> 3; + cluster_size_5 -> 5 + end, + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodes_count, ClusterSize}, + {rmq_nodename_suffix, Group}, + {tcp_ports_base}]), + Config2 = rabbit_ct_helpers:run_steps(Config1, + [fun merge_app_env/1 ] ++ + rabbit_ct_broker_helpers:setup_steps()), + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_queue_cleanup_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end. + +end_per_group(clustered, Config) -> + Config; +end_per_group(unclustered, Config) -> + Config; +end_per_group(_, Config) -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_client_helpers:setup_steps()). + +merge_app_env(Config) -> + rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). + +end_per_testcase(Testcase, Config) -> + catch delete_queues(), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_client_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +declare_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + LQ = ?config(queue_name, Config), + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]), + assert_queue_type(Server, LQ, quorum), + + DQ = <<"classic-declare-args-q">>, + declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]), + assert_queue_type(Server, DQ, classic), + + DQ2 = <<"classic-q2">>, + declare(Ch, DQ2), + assert_queue_type(Server, DQ2, classic). + +declare_invalid_properties(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + LQ = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = LQ, + auto_delete = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = LQ, + exclusive = true, + durable = true, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + amqp_channel:call( + rabbit_ct_client_helpers:open_channel(Config, Server), + #'queue.declare'{queue = LQ, + durable = false, + arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})). + +declare_invalid_args(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + LQ = ?config(queue_name, Config), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-expires">>, long, 2000}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-message-ttl">>, long, 2000}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length">>, long, 2000}])), + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-length-bytes">>, long, 2000}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-max-priority">>, long, 2000}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-overflow">>, longstr, <<"drop-head">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-queue-mode">>, longstr, <<"lazy">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, 0}])). + +start_queue(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + LQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Check that the application and one ra node are up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + + %% Test declare an existing queue + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Test declare with same arguments + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Test declare an existing queue with different arguments + ?assertExit(_, declare(Ch, LQ, [])), + + %% Check that the application and process are still up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + +start_queue_concurrent(Config) -> + Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + LQ = ?config(queue_name, Config), + Self = self(), + [begin + _ = spawn_link(fun () -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server), + %% Test declare an existing queue + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, + [{<<"x-queue-type">>, + longstr, + <<"quorum">>}])), + Self ! {done, Server} + end) + end || Server <- Servers], + + [begin + receive {done, Server} -> ok + after 5000 -> exit({await_done_timeout, Server}) + end + end || Server <- Servers], + + + ok. + +quorum_cluster_size_3(Config) -> + quorum_cluster_size_x(Config, 3, 3). + +quorum_cluster_size_7(Config) -> + quorum_cluster_size_x(Config, 7, 5). + +quorum_cluster_size_x(Config, Max, Expected) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-quorum-initial-group-size">>, long, Max}])), + {ok, Members, _} = ra:members({RaName, Server}), + ?assertEqual(Expected, length(Members)), + Info = rpc:call(Server, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QQ)]), + MembersQ = proplists:get_value(members, Info), + ?assertEqual(Expected, length(MembersQ)). + +stop_queue(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + LQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Check that the application and one ra node are up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + + %% Delete the quorum queue + ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})), + %% Check that the application and process are down + wait_until(fun() -> + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + end), + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))). + +restart_queue(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + LQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + ok = rabbit_ct_broker_helpers:start_node(Config, Server), + + %% Check that the application and one ra node are up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])). + +idempotent_recover(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + LQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', LQ, 0, 0}, + declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% kill default vhost to trigger recovery + [{_, SupWrapperPid, _, _} | _] = rpc:call(Server, supervisor, + which_children, + [rabbit_vhost_sup_sup]), + [{_, Pid, _, _} | _] = rpc:call(Server, supervisor, + which_children, + [SupWrapperPid]), + %% kill the vhost process to trigger recover + rpc:call(Server, erlang, exit, [Pid, kill]), + + timer:sleep(1000), + %% validate quorum queue is still functional + RaName = ra_name(LQ), + {ok, _, _} = ra:members({RaName, Server}), + %% validate vhosts are running - or rather validate that at least one + %% vhost per cluster is running + [begin + #{cluster_state := ServerStatuses} = maps:from_list(I), + ?assertMatch(#{Server := running}, maps:from_list(ServerStatuses)) + end || I <- rpc:call(Server, rabbit_vhost,info_all, [])], + ok. + +vhost_with_quorum_queue_is_deleted(Config) -> + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + VHost = <<"vhost2">>, + QName = atom_to_binary(?FUNCTION_NAME, utf8), + RaName = binary_to_atom(<<VHost/binary, "_", QName/binary>>, utf8), + User = ?config(rmq_username, Config), + ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost, User), + ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost), + Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node, + VHost), + {ok, Ch} = amqp_connection:open_channel(Conn), + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + UId = rpc:call(Node, ra_directory, where_is, [RaName]), + ?assert(UId =/= undefined), + ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost), + %% validate quorum queues got deleted + undefined = rpc:call(Node, ra_directory, where_is, [RaName]), + ok. + +restart_all_types(Config) -> + %% Test the node restart with both types of queues (quorum and classic) to + %% ensure there are no regressions + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ1 = <<"restart_all_types-qq1">>, + ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, + declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + QQ2 = <<"restart_all_types-qq2">>, + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + CQ1 = <<"restart_all_types-classic1">>, + ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])), + rabbit_ct_client_helpers:publish(Ch, CQ1, 1), + CQ2 = <<"restart_all_types-classic2">>, + ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])), + rabbit_ct_client_helpers:publish(Ch, CQ2, 1), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + ok = rabbit_ct_broker_helpers:start_node(Config, Server), + + %% Check that the application and two ra nodes are up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + %% Check the classic queues restarted correctly + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + {#'basic.get_ok'{}, #amqp_msg{}} = + amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}), + {#'basic.get_ok'{}, #amqp_msg{}} = + amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), + delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). + +delete_queues(Ch, Queues) -> + [amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Queues]. + +stop_start_rabbit_app(Config) -> + %% Test start/stop of rabbit app with both types of queues (quorum and + %% classic) to ensure there are no regressions + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ1 = <<"stop_start_rabbit_app-qq">>, + ?assertEqual({'queue.declare_ok', QQ1, 0, 0}, + declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + QQ2 = <<"quorum-q2">>, + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + CQ1 = <<"stop_start_rabbit_app-classic">>, + ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])), + rabbit_ct_client_helpers:publish(Ch, CQ1, 1), + CQ2 = <<"stop_start_rabbit_app-classic2">>, + ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])), + rabbit_ct_client_helpers:publish(Ch, CQ2, 1), + + rabbit_control_helper:command(stop_app, Server), + %% Check the ra application has stopped (thus its supervisor and queues) + ?assertMatch(false, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + + rabbit_control_helper:command(start_app, Server), + + %% Check that the application and two ra nodes are up + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))), + ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])), + %% Check the classic queues restarted correctly + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + {#'basic.get_ok'{}, #amqp_msg{}} = + amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}), + {#'basic.get_ok'{}, #amqp_msg{}} = + amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), + delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). + +publish(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + publish(Ch, QQ), + Name = ra_name(QQ), + wait_for_messages_ready(Servers, Name, 1), + wait_for_messages_pending_ack(Servers, Name, 0). + +ra_name(Q) -> + binary_to_atom(<<"%2F_", Q/binary>>, utf8). + +publish_and_restart(Config) -> + %% Test the node restart with both types of queues (quorum and classic) to + %% ensure there are no regressions + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + RaName = ra_name(QQ), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + ok = rabbit_ct_broker_helpers:start_node(Config, Server), + + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + publish(rabbit_ct_client_helpers:open_channel(Config, Server), QQ), + wait_for_messages_ready(Servers, RaName, 2), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_first_empty(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + consume_empty(Ch, QQ, false), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + consume(Ch, QQ, false), + rabbit_ct_client_helpers:close_channel(Ch). + +consume_in_minority(Config) -> + [Server0, Server1, Server2] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false})). + +consume_and_autoack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + consume(Ch, QQ, true), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_from_empty_queue(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + consume_empty(Ch, QQ, false). + +subscribe(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive_basic_deliver(false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + +subscribe_should_fail_when_global_qos_true(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + qos(Ch, 10, true), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + try subscribe(Ch, QQ, false) of + _ -> exit(subscribe_should_not_pass) + catch + _:_ = Err -> + ct:pal("Err ~p", [Err]) + end, + ok. + +subscribe_with_autoack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 2), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, true), + receive_basic_deliver(false), + receive_basic_deliver(false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_and_ack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_and_multiple_ack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + _ = consume(Ch, QQ, false), + _ = consume(Ch, QQ, false), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +subscribe_and_ack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +subscribe_and_multiple_ack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +consume_and_requeue_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 2), + wait_for_messages_pending_ack(Servers, RaName, 0), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_messages_ready(Servers, RaName, 2), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_and_requeue_multiple_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + _ = consume(Ch, QQ, false), + _ = consume(Ch, QQ, false), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_and_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +consume_and_multiple_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + _ = consume(Ch, QQ, false), + _ = consume(Ch, QQ, false), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +subscribe_and_requeue_multiple_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + receive_basic_deliver(true), + receive_basic_deliver(true), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = true}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end + end. + +subscribe_and_requeue_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end + end. + +subscribe_and_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +subscribe_and_multiple_nack(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 3), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +publisher_confirms(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + ct:pal("WAIT FOR CONFIRMS ~n", []), + amqp_channel:wait_for_confirms(Ch, 5000), + amqp_channel:unregister_confirm_handler(Ch), + ok. + +publisher_confirms_with_deleted_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + % subscribe(Ch, QQ, false), + publish(Ch, QQ), + delete_queues(Ch, [QQ]), + ct:pal("WAIT FOR CONFIRMS ~n", []), + amqp_channel:wait_for_confirms_or_die(Ch, 5000), + amqp_channel:unregister_confirm_handler(Ch), + ok. + +dead_letter_to_classic_queue(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_to_classic_queue">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, CQ} + ])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, CQ, false). + +dead_letter_to_quorum_queue(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + QQ2 = <<"dead_letter_to_quorum_queue-q2">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, QQ2} + ])), + ?assertEqual({'queue.declare_ok', QQ2, 0, 0}, + declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + RaName2 = ra_name(QQ2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaName2, 0), + wait_for_messages_pending_ack(Servers, RaName2, 0), + DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + wait_for_messages_ready(Servers, RaName2, 0), + wait_for_messages_pending_ack(Servers, RaName2, 0), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaName2, 1), + wait_for_messages_pending_ack(Servers, RaName2, 0), + _ = consume(Ch, QQ2, false). + +dead_letter_from_classic_to_quorum_queue(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + CQ = <<"classic-q-dead_letter_from_classic_to_quorum_queue">>, + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, + declare(Ch, CQ, [{<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, QQ} + ])), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + publish(Ch, CQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), + DeliveryTag = consume(Ch, CQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[CQ, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + _ = consume(Ch, QQ, false), + rabbit_ct_client_helpers:close_channel(Ch). + +cleanup_queue_state_on_channel_after_publish(Config) -> + %% Declare/delete the queue in one channel and publish on a different one, + %% to verify that the cleanup is propagated through channels + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + publish(Ch2, QQ), + Res = dirty_query(Servers, RaName, fun rabbit_fifo:query_consumer_count/1), + ct:pal ("Res ~p", [Res]), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaName, 1), + [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []), + %% Check the channel state contains the state for the quorum queue on + %% channel 1 and 2 + wait_for_cleanup(Server, NCh1, 0), + wait_for_cleanup(Server, NCh2, 1), + %% then delete the queue and wait for the process to terminate + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), + wait_until(fun() -> + [] == rpc:call(Server, supervisor, which_children, + [ra_server_sup]) + end), + %% Check that all queue states have been cleaned + wait_for_cleanup(Server, NCh1, 0), + wait_for_cleanup(Server, NCh2, 0). + +cleanup_queue_state_on_channel_after_subscribe(Config) -> + %% Declare/delete the queue and publish in one channel, while consuming on a + %% different one to verify that the cleanup is propagated through channels + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), + Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + publish(Ch1, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch2, QQ, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end, + [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []), + %% Check the channel state contains the state for the quorum queue on channel 1 and 2 + wait_for_cleanup(Server, NCh1, 1), + wait_for_cleanup(Server, NCh2, 1), + ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})), + wait_until(fun() -> + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + end), + %% Check that all queue states have been cleaned + wait_for_cleanup(Server, NCh1, 0), + wait_for_cleanup(Server, NCh2, 0). + +recover_from_single_failure(Config) -> + [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + RaName = ra_name(QQ), + + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready([Server, Server1], RaName, 3), + wait_for_messages_pending_ack([Server, Server1], RaName, 0), + + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0). + +recover_from_multiple_failures(Config) -> + [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + RaName = ra_name(QQ), + + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + + wait_for_messages_ready([Server], RaName, 3), + wait_for_messages_pending_ack([Server], RaName, 0), + + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + + %% there is an assumption here that the messages were not lost and were + %% recovered when a quorum was restored. Not the best test perhaps. + wait_for_messages_ready(Servers, RaName, 6), + wait_for_messages_pending_ack(Servers, RaName, 0). + +leadership_takeover(Config) -> + %% Kill nodes in succession forcing the takeover of leadership, and all messages that + %% are in the queue. + [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + RaName = ra_name(QQ), + + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + + wait_for_messages_ready([Server], RaName, 3), + wait_for_messages_pending_ack([Server], RaName, 0), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server), + ok = rabbit_ct_broker_helpers:start_node(Config, Server2), + ok = rabbit_ct_broker_helpers:stop_node(Config, Server1), + ok = rabbit_ct_broker_helpers:start_node(Config, Server), + + wait_for_messages_ready([Server2, Server], RaName, 3), + wait_for_messages_pending_ack([Server2, Server], RaName, 0), + + ok = rabbit_ct_broker_helpers:start_node(Config, Server1), + wait_for_messages_ready(Servers, RaName, 3), + wait_for_messages_pending_ack(Servers, RaName, 0). + +metrics_cleanup_on_leadership_takeover(Config) -> + %% Queue core metrics should be deleted from a node once the leadership is transferred + %% to another follower + [Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + + wait_for_messages_ready([Server], RaName, 3), + wait_for_messages_pending_ack([Server], RaName, 0), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + QRes = rabbit_misc:r(<<"/">>, queue, QQ), + wait_until( + fun() -> + case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of + [{QRes, 3, 0, 3, _}] -> true; + _ -> false + end + end), + force_leader_change(Leader, Servers, QQ), + wait_until(fun () -> + [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso + [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes]) + end), + ok. + +metrics_cleanup_on_leader_crash(Config) -> + %% Queue core metrics should be deleted from a node once the leadership is transferred + %% to another follower + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + + wait_for_messages_ready([Server], RaName, 3), + wait_for_messages_pending_ack([Server], RaName, 0), + {ok, _, {Name, Leader}} = ra:members({RaName, Server}), + QRes = rabbit_misc:r(<<"/">>, queue, QQ), + wait_until( + fun() -> + case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of + [{QRes, 3, 0, 3, _}] -> true; + _ -> false + end + end), + Pid = rpc:call(Leader, erlang, whereis, [Name]), + rpc:call(Leader, erlang, exit, [Pid, kill]), + [Other | _] = lists:delete(Leader, Servers), + catch ra:trigger_election(Other), + %% kill it again just in case it came straight back up again + catch rpc:call(Leader, erlang, exit, [Pid, kill]), + + %% this isn't a reliable test as the leader can be restarted so quickly + %% after a crash it is elected leader of the next term as well. + wait_until( + fun() -> + [] == rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) + end), + ok. + +delete_declare(Config) -> + %% Delete cluster in ra is asynchronous, we have to ensure that we handle that in rmq + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, + nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 3), + + ?assertMatch(#'queue.delete_ok'{}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + %% the actual data deletions happen after the call has returned as a quorum + %% queue leader waits for all nodes to confirm they replicated the poison + %% pill before terminating itself. + timer:sleep(1000), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + %% Ensure that is a new queue and it's empty + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0). + +basic_cancel(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + receive + {#'basic.deliver'{}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +purge(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 2), + wait_for_messages_pending_ack(Servers, RaName, 0), + _DeliveryTag = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 1), + {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaName, 0). + +sync_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + {error, _, _} = + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QQ]), + ok. + +cancel_sync_queue(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + {error, _, _} = + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"cancel_sync_queue">>, QQ]), + ok. + +declare_during_node_down(Config) -> + [Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + + stop_node(Config, DownServer), + % rabbit_ct_broker_helpers:stop_node(Config, DownServer), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + timer:sleep(2000), + rabbit_ct_broker_helpers:start_node(Config, DownServer), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + ok. + +add_member_not_running(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + ct:pal("add_member_not_running config ~p", [Config]), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({error, node_not_running}, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, 'rabbit@burrow'])). + +add_member_classic(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + CQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, CQ, Server])). + +add_member_already_a_member(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({error, already_a_member}, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server])). + +add_member_not_found(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + QQ = ?config(queue_name, Config), + ?assertEqual({error, not_found}, + rpc:call(Server, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server])). + +add_member(Config) -> + [Server0, Server1] = Servers0 = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server0), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({error, node_not_running}, + rpc:call(Server0, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server1])), + ok = rabbit_control_helper:command(stop_app, Server1), + ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), + rabbit_control_helper:command(start_app, Server1), + ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member, + [<<"/">>, QQ, Server1])), + Info = rpc:call(Server0, rabbit_quorum_queue, infos, + [rabbit_misc:r(<<"/">>, queue, QQ)]), + Servers = lists:sort(Servers0), + ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))). + +delete_member_not_running(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({error, node_not_running}, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, 'rabbit@burrow'])). + +delete_member_classic(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + CQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ?assertEqual({error, classic_queue_not_supported}, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, CQ, Server])). + +delete_member_not_found(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + QQ = ?config(queue_name, Config), + ?assertEqual({error, not_found}, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, Server])). + +delete_member(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, Server])), + ?assertEqual({error, not_a_member}, + rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, Server])). + +cleanup_data_dir(Config) -> + %% This test is slow, but also checks that we handle properly errors when + %% trying to delete a queue in minority. A case clause there had gone + %% previously unnoticed. + + [Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server1), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + + [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []), + DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]), + ?assert(filelib:is_dir(DataDir)), + + ok = rabbit_ct_broker_helpers:stop_node(Config, Server2), + + ?assertExit({{shutdown, + {connection_closing, {server_initiated_close, 541, _}}}, _}, + amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + ?assert(filelib:is_dir(DataDir)), + + ?assertEqual(ok, + rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir, + [])), + ?assert(not filelib:is_dir(DataDir)). + +basic_recover(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + _ = consume(Ch, QQ, false), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). +%%---------------------------------------------------------------------------- + +declare(Ch, Q) -> + declare(Ch, Q, []). + +declare(Ch, Q, Args) -> + amqp_channel:call(Ch, #'queue.declare'{queue = Q, + durable = true, + auto_delete = false, + arguments = Args}). + +assert_queue_type(Server, Q, Expected) -> + Actual = get_queue_type(Server, Q), + Expected = Actual. + +get_queue_type(Server, Q) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q), + {ok, AMQQueue} = + rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + AMQQueue#amqqueue.type. + +wait_for_messages(Config, Stats) -> + wait_for_messages(Config, lists:sort(Stats), 60). + +wait_for_messages(Config, Stats, 0) -> + ?assertEqual(Stats, + lists:sort( + filter_queues(Stats, + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages", "messages_ready", + "messages_unacknowledged"])))); +wait_for_messages(Config, Stats, N) -> + case lists:sort( + filter_queues(Stats, + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages", "messages_ready", + "messages_unacknowledged"]))) of + Stats0 when Stats0 == Stats -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Config, Stats, N - 1) + end. + +filter_queues(Expected, Got) -> + Keys = [K || [K, _, _, _] <- Expected], + lists:filter(fun([K, _, _, _]) -> + lists:member(K, Keys) + end, Got). + +publish(Ch, Queue) -> + ok = amqp_channel:call(Ch, + #'basic.publish'{routing_key = Queue}, + #amqp_msg{props = #'P_basic'{delivery_mode = 2}, + payload = <<"msg">>}). + +consume(Ch, Queue, NoAck) -> + {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = NoAck}), + ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, Reply), + GetOk#'basic.get_ok'.delivery_tag. + +consume_empty(Ch, Queue, NoAck) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = Queue, + no_ack = NoAck})). + +subscribe(Ch, Queue, NoAck) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +qos(Ch, Prefetch, Global) -> + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = Global, + prefetch_count = Prefetch})). + +receive_basic_deliver(Redelivered) -> + receive + {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> + ok + end. + +wait_for_cleanup(Server, Channel, Number) -> + wait_for_cleanup(Server, Channel, Number, 60). + +wait_for_cleanup(Server, Channel, Number, 0) -> + ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel]))); +wait_for_cleanup(Server, Channel, Number, N) -> + case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of + Length when Number == Length -> + ok; + _ -> + timer:sleep(500), + wait_for_cleanup(Server, Channel, Number, N - 1) + end. + + +wait_for_messages_ready(Servers, QName, Ready) -> + wait_for_messages(Servers, QName, Ready, + fun rabbit_fifo:query_messages_ready/1, 60). + +wait_for_messages_pending_ack(Servers, QName, Ready) -> + wait_for_messages(Servers, QName, Ready, + fun rabbit_fifo:query_messages_checked_out/1, 60). + +wait_for_messages(Servers, QName, Number, Fun, 0) -> + Msgs = dirty_query(Servers, QName, Fun), + Totals = lists:map(fun(M) when is_map(M) -> + maps:size(M); + (_) -> + -1 + end, Msgs), + ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]); +wait_for_messages(Servers, QName, Number, Fun, N) -> + Msgs = dirty_query(Servers, QName, Fun), + case lists:all(fun(M) when is_map(M) -> + maps:size(M) == Number; + (_) -> + false + end, Msgs) of + true -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Servers, QName, Number, Fun, N - 1) + end. + +dirty_query(Servers, QName, Fun) -> + lists:map( + fun(N) -> + case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of + {ok, {_, Msgs}, _} -> + Msgs; + _ -> + undefined + end + end, Servers). + +wait_until(Condition) -> + wait_until(Condition, 60). + +wait_until(Condition, 0) -> + ?assertEqual(true, Condition()); +wait_until(Condition, N) -> + case Condition() of + true -> + ok; + _ -> + timer:sleep(500), + wait_until(Condition, N - 1) + end. + +force_leader_change(Leader, Servers, Q) -> + RaName = ra_name(Q), + [F1, _] = Servers -- [Leader], + ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]), + case ra:members({RaName, Leader}) of + {ok, _, {_, Leader}} -> + %% Leader has been re-elected + force_leader_change(Leader, Servers, Q); + {ok, _, _} -> + %% Leader has changed + ok + end. + +delete_queues() -> + [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>) + || Q <- rabbit_amqqueue:list()]. + +stop_node(Config, Server) -> + rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl new file mode 100644 index 0000000000..a2e22afc2e --- /dev/null +++ b/test/rabbit_fifo_SUITE.erl @@ -0,0 +1,624 @@ +-module(rabbit_fifo_SUITE). + +-compile(export_all). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +all() -> + [ + {group, tests} + ]. + +all_tests() -> + [ + basics, + return, + rabbit_fifo_returns_correlation, + resends_lost_command, + returns_after_down, + resends_after_lost_applied, + handles_reject_notification, + two_quick_enqueues, + detects_lost_delivery, + dequeue, + discard, + cancel_checkout, + credit, + untracked_enqueue, + flow, + test_queries, + duplicate_delivery, + usage + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_group(_, Config) -> + PrivDir = ?config(priv_dir, Config), + _ = application:load(ra), + ok = application:set_env(ra, data_dir, PrivDir), + application:ensure_all_started(ra), + application:ensure_all_started(lg), + Config. + +end_per_group(_, Config) -> + _ = application:stop(ra), + Config. + +init_per_testcase(TestCase, Config) -> + ra_server_sup:remove_all(), + ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), + ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), + [ + {cluster_name, TestCase}, + {uid, atom_to_binary(TestCase, utf8)}, + {node_id, {TestCase, node()}}, + {uid2, atom_to_binary(ServerName2, utf8)}, + {node_id2, {ServerName2, node()}}, + {uid3, atom_to_binary(ServerName3, utf8)}, + {node_id3, {ServerName3, node()}} + | Config]. + +basics(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + CustomerTag = UId, + ok = start_cluster(ClusterName, [ServerId]), + FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0), + + ra_log_wal:force_roll_over(ra_log_wal), + % create segment the segment will trigger a snapshot + timer:sleep(1000), + + {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1), + % process ra events + FState3 = process_ra_event(FState2, 250), + + FState5 = receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of + {internal, _AcceptedSeqs, _Actions, _FState4} -> + exit(unexpected_internal_event); + {{delivery, C, [{MsgId, _Msg}]}, FState4} -> + {ok, S} = rabbit_fifo_client:settle(C, [MsgId], + FState4), + S + end + after 5000 -> + exit(await_msg_timeout) + end, + + % process settle applied notificaiton + FState5b = process_ra_event(FState5, 250), + _ = ra:stop_server(ServerId), + _ = ra:restart_server(ServerId), + + % give time to become leader + timer:sleep(500), + {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b), + % process applied event + FState6b = process_ra_event(FState6, 250), + + receive + {ra_event, Frm, E} -> + case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of + {internal, _, _, _FState7} -> + ct:pal("unexpected event ~p~n", [E]), + exit({unexpected_internal_event, E}); + {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} -> + {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7), + ok + end + after 2000 -> + exit(await_msg_timeout) + end, + ra:stop_server(ServerId), + ok. + +return(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ServerId2 = ?config(node_id2, Config), + ok = start_cluster(ClusterName, [ServerId, ServerId2]), + + F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]), + {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), + {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), + {_, _, F2} = process_ra_events(F1, 100), + {ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), + {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), + + ra:stop_server(ServerId), + ok. + +rabbit_fifo_returns_correlation(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0), + receive + {ra_event, Frm, E} -> + case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of + {internal, [corr1], [], _F2} -> + ok; + {Del, _} -> + exit({unexpected, Del}) + end + after 2000 -> + exit(await_msg_timeout) + end, + ra:stop_server(ServerId), + ok. + +duplicate_delivery(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), + Fun = fun Loop(S0) -> + receive + {ra_event, Frm, E} = Evt -> + case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of + {internal, [corr1], [], S1} -> + Loop(S1); + {_Del, S1} -> + %% repeat event delivery + self() ! Evt, + %% check that then next received delivery doesn't + %% repeat or crash + receive + {ra_event, F, E1} -> + case rabbit_fifo_client:handle_ra_event(F, E1, S1) of + {internal, [], [], S2} -> + S2 + end + end + end + after 2000 -> + exit(await_msg_timeout) + end + end, + Fun(F2), + ra:stop_server(ServerId), + ok. + +usage(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), + {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), + {_, _, _} = process_ra_events(F3, 50), + % force tick and usage stats emission + ServerId ! tick_timeout, + timer:sleep(50), + % ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]), + Use = rabbit_fifo:usage(element(1, ServerId)), + ct:pal("Use ~w~n", [Use]), + ra:stop_server(ServerId), + ?assert(Use > 0.0), + ok. + +resends_lost_command(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + ok = meck:new(ra, [passthrough]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), + % lose the enqueue + meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + meck:unload(ra), + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + {_, _, F4} = process_ra_events(F3, 500), + {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + ra:stop_server(ServerId), + ok. + +two_quick_enqueues(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + _ = process_ra_events(F2, 500), + ra:stop_server(ServerId), + ok. + +detects_lost_delivery(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), + {_, _, F0} = process_ra_events(F00, 100), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + % lose first delivery + receive + {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> + ok + after 500 -> + exit(await_delivery_timeout) + end, + + % assert three deliveries were received + {[_, _, _], _, _} = process_ra_events(F3, 500), + ra:stop_server(ServerId), + ok. + +returns_after_down(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), + {_, _, F2} = process_ra_events(F1, 500), + % start a customer in a separate processes + % that exits after checkout + Self = self(), + _Pid = spawn(fun () -> + F = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F), + Self ! checkout_done + end), + receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, + % message should be available for dequeue + {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2), + ra:stop_server(ServerId), + ok. + +resends_after_lost_applied(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)), + 500), + {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + % lose an applied event + receive + {ra_event, _, {applied, _}} -> + ok + after 500 -> + exit(await_ra_event_timeout) + end, + % send another message + {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), + {_, _, F4} = process_ra_events(F3, 500), + {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), + {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + ra:stop_server(ServerId), + ok. + +handles_reject_notification(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId1 = ?config(node_id, Config), + ServerId2 = ?config(node_id2, Config), + UId1 = ?config(uid, Config), + CId = {UId1, self()}, + + ok = start_cluster(ClusterName, [ServerId1, ServerId2]), + _ = ra:process_command(ServerId1, {checkout, + {auto, 10, simple_prefetch}, CId}), + % reverse order - should try the first node in the list first + F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]), + {ok, F1} = rabbit_fifo_client:enqueue(one, F0), + + timer:sleep(500), + + % the applied notification + _F2 = process_ra_event(F1, 250), + ra:stop_server(ServerId1), + ra:stop_server(ServerId2), + ok. + +discard(Config) -> + PrivDir = ?config(priv_dir, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + ClusterName = ?config(cluster_name, Config), + Conf = #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + log_init_args => #{data_dir => PrivDir, uid => UId}, + initial_member => [], + machine => {module, rabbit_fifo, + #{dead_letter_handler => + {?MODULE, dead_letter_handler, [self()]}}}}, + _ = ra:start_server(Conf), + ok = ra:trigger_election(ServerId), + _ = ra:members(ServerId), + + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), + F3 = discard_next_delivery(F2, 500), + {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), + receive + {dead_letter, Letters} -> + ct:pal("dead letters ~p~n", [Letters]), + [{_, msg1}] = Letters, + ok + after 500 -> + exit(dead_letter_timeout) + end, + ra:stop_server(ServerId), + ok. + +cancel_checkout(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1), + {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), + {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), + {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + ok. + +credit(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + {_, _, F3} = process_ra_events(F2, [], 250), + %% checkout with 0 prefetch + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3), + %% assert no deliveries + {_, _, F5} = process_ra_events0(F4, [], [], 250, + fun + (D, _) -> error({unexpected_delivery, D}) + end), + %% provide some credit + {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), + {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = + process_ra_events(F6, [], 250), + + %% credit and drain + {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), + {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = + process_ra_events(F8, [], 250), + flush(), + + %% enqueue another message - at this point the consumer credit should be + %% all used up due to the drain + {ok, F10} = rabbit_fifo_client:enqueue(m3, F9), + %% assert no deliveries + {_, _, F11} = process_ra_events0(F10, [], [], 250, + fun + (D, _) -> error({unexpected_delivery, D}) + end), + %% credit again and receive the last message + {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), + {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250), + ok. + +untracked_enqueue(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + + ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1), + timer:sleep(100), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0), + ra:stop_server(ServerId), + ok. + + +flow(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + {ok, F3} = rabbit_fifo_client:enqueue(m3, F2), + {slow, F4} = rabbit_fifo_client:enqueue(m4, F3), + {_, _, F5} = process_ra_events(F4, 500), + {ok, _} = rabbit_fifo_client:enqueue(m5, F5), + ra:stop_server(ServerId), + ok. + +test_queries(Config) -> + ClusterName = ?config(cluster_name, Config), + ServerId = ?config(node_id, Config), + ok = start_cluster(ClusterName, [ServerId]), + P = spawn(fun () -> + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), + {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), + process_ra_events(F2, 100), + receive stop -> ok end + end), + F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0), + {ok, {_, Ready}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_ready/1), + ?assertEqual(1, maps:size(Ready)), + ct:pal("Ready ~w~n", [Ready]), + {ok, {_, Checked}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_messages_checked_out/1), + ?assertEqual(1, maps:size(Checked)), + ct:pal("Checked ~w~n", [Checked]), + {ok, {_, Processes}, _} = ra:local_query(ServerId, + fun rabbit_fifo:query_processes/1), + ct:pal("Processes ~w~n", [Processes]), + ?assertEqual(2, length(Processes)), + P ! stop, + ra:stop_server(ServerId), + ok. + +dead_letter_handler(Pid, Msgs) -> + Pid ! {dead_letter, Msgs}. + +dequeue(Config) -> + ClusterName = ?config(priv_dir, Config), + ServerId = ?config(node_id, Config), + UId = ?config(uid, Config), + Tag = UId, + ok = start_cluster(ClusterName, [ServerId]), + F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), + {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), + {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), + {_, _, F2} = process_ra_events(F2_, 100), + + {ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), + {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), + {_, _, F4} = process_ra_events(F4_, 100), + {ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), + {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), + ra:stop_server(ServerId), + ok. + +enq_deq_n(N, F0) -> + enq_deq_n(N, F0, []). + +enq_deq_n(0, F0, Acc) -> + {_, _, F} = process_ra_events(F0, 100), + {F, Acc}; +enq_deq_n(N, F, Acc) -> + {ok, F1} = rabbit_fifo_client:enqueue(N, F), + {_, _, F2} = process_ra_events(F1, 10), + {ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), + + {_, _, F4} = process_ra_events(F3, 5), + enq_deq_n(N-1, F4, [Deq | Acc]). + +conf(ClusterName, UId, ServerId, _, Peers) -> + #{cluster_name => ClusterName, + id => ServerId, + uid => UId, + log_init_args => #{uid => UId}, + initial_members => Peers, + machine => {module, rabbit_fifo, #{}}}. + +process_ra_event(State, Wait) -> + receive + {ra_event, From, Evt} -> + % ct:pal("processed ra event ~p~n", [Evt]), + {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State), + S + after Wait -> + exit(ra_event_timeout) + end. + +process_ra_events(State0, Wait) -> + process_ra_events(State0, [], Wait). + +process_ra_events(State, Acc, Wait) -> + DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> + MsgIds = [element(1, M) || M <- Msgs], + {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), + S2 + end, + process_ra_events0(State, Acc, [], Wait, DeliveryFun). + +process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) -> + receive + {ra_event, From, Evt} -> + % ct:pal("ra event ~w~n", [Evt]), + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, Actions, State} -> + process_ra_events0(State, Acc, Actions0 ++ Actions, + Wait, DeliveryFun); + {{delivery, _Tag, Msgs} = Del, State1} -> + State = DeliveryFun(Del, State1), + process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun); + eol -> + eol + end + after Wait -> + {Acc, Actions0, State0} + end. + +discard_next_delivery(State0, Wait) -> + receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, _Actions, State} -> + discard_next_delivery(State, Wait); + {{delivery, Tag, Msgs}, State1} -> + MsgIds = [element(1, M) || M <- Msgs], + ct:pal("discarding ~p", [Msgs]), + {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds, + State1), + State + end + after Wait -> + State0 + end. + +return_next_delivery(State0, Wait) -> + receive + {ra_event, From, Evt} -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, _, State} -> + return_next_delivery(State, Wait); + {{delivery, Tag, Msgs}, State1} -> + MsgIds = [element(1, M) || M <- Msgs], + ct:pal("returning ~p", [Msgs]), + {ok, State} = rabbit_fifo_client:return(Tag, MsgIds, + State1), + State + end + after Wait -> + State0 + end. + +validate_process_down(Name, 0) -> + exit({process_not_down, Name}); +validate_process_down(Name, Num) -> + case whereis(Name) of + undefined -> + ok; + _ -> + timer:sleep(100), + validate_process_down(Name, Num-1) + end. + +start_cluster(ClusterName, ServerIds, RaFifoConfig) -> + {ok, Started, _} = ra:start_cluster(ClusterName, + {module, rabbit_fifo, RaFifoConfig}, + ServerIds), + ?assertEqual(length(Started), length(ServerIds)), + ok. + +start_cluster(ClusterName, ServerIds) -> + start_cluster(ClusterName, ServerIds, #{}). + +flush() -> + receive + Msg -> + ct:pal("flushed: ~w~n", [Msg]), + flush() + after 10 -> + ok + end. diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl index 71b74ea104..a5d3a0fd03 100644 --- a/test/rabbitmqctl_integration_SUITE.erl +++ b/test/rabbitmqctl_integration_SUITE.erl @@ -151,7 +151,7 @@ list_queues_stopped(Config) -> ListedQueues = [ {Name, State} || [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, ["list_queues", "name", "state"]) ], + Config, 0, ["list_queues", "name", "state", "--no-table-headers"]) ], [ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ], %% Node is running. Vhost is down @@ -176,4 +176,4 @@ assert_ctl_queues(Config, Node, Args, Expected0) -> end. run_list_queues(Config, Node, Args) -> - rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]).
\ No newline at end of file + rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name", "--no-table-headers"]). |
