summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMatteo Cafasso <noxdafox@gmail.com>2018-11-24 17:20:23 +0200
committerMatteo Cafasso <noxdafox@gmail.com>2018-11-24 17:20:23 +0200
commit11002abb8cbf71cea43519fde9efed7cbb87c4ef (patch)
tree6429a72c9e26bcf84a3dc4f493a3251794ee6059 /test
parenteb1b636645bf5a02361ade0821c7c96e77ecf966 (diff)
parent0c7c7d960c8ca54b5b29c06d295acef0bc9f3c7a (diff)
downloadrabbitmq-server-git-11002abb8cbf71cea43519fde9efed7cbb87c4ef.tar.gz
Merge branch 'master' of github.com:rabbitmq/rabbitmq-server
Diffstat (limited to 'test')
-rw-r--r--test/backing_queue_SUITE.erl7
-rw-r--r--test/cluster_SUITE.erl44
-rw-r--r--test/config_schema_SUITE_data/rabbit.snippets39
-rw-r--r--test/list_consumers_sanity_check_SUITE.erl10
-rw-r--r--test/list_queues_online_and_offline_SUITE.erl6
-rw-r--r--test/policy_SUITE.erl3
-rw-r--r--test/proxy_protocol_SUITE.erl2
-rw-r--r--test/quorum_queue_SUITE.erl1812
-rw-r--r--test/rabbit_fifo_SUITE.erl624
-rw-r--r--test/rabbitmqctl_integration_SUITE.erl4
10 files changed, 2494 insertions, 57 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 60f86e0542..94cbd48e8c 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -701,7 +701,9 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter,
+ <<"bq_variable_queue_delete_msg_store_files_callback1">>,
+ #{}),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
@@ -737,7 +739,8 @@ bq_queue_recover1(Config) ->
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
+ <<"bq_queue_recover1">>, #{}),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index 4864989b6a..62928aae9f 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -29,8 +29,7 @@
delegates_async,
delegates_sync,
queue_cleanup,
- declare_on_dead_queue,
- refresh_events
+ declare_on_dead_queue
]).
all() ->
@@ -240,34 +239,6 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
-refresh_events(Config) ->
- {I, J} = ?config(test_direction, Config),
- From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
- To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
- rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
- passed = rabbit_ct_broker_helpers:rpc(Config,
- From, ?MODULE, refresh_events1, [Config, To]).
-
-refresh_events1(Config, SecondaryNode) ->
- dummy_event_receiver:start(self(), [node(), SecondaryNode],
- [channel_created, queue_created]),
-
- {_Writer, Ch} = test_spawn(),
- expect_events(pid, Ch, channel_created),
- rabbit_channel:shutdown(Ch),
-
- {_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(pid, Ch2, channel_created),
- rabbit_channel:shutdown(Ch2),
-
- {new, #amqqueue{name = QName} = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>),
- false, false, [], none, <<"acting-user">>),
- expect_events(name, QName, queue_created),
- rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
-
- dummy_event_receiver:stop(),
- passed.
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
@@ -307,19 +278,6 @@ dead_queue_loop(QueueName, OldPid) ->
Q
end.
-expect_events(Tag, Key, Type) ->
- expect_event(Tag, Key, Type),
- rabbit:force_event_refresh(make_ref()),
- expect_event(Tag, Key, Type).
-
-expect_event(Tag, Key, Type) ->
- receive #event{type = Type, props = Props} ->
- case rabbit_misc:pget(Tag, Props) of
- Key -> ok;
- _ -> expect_event(Tag, Key, Type)
- end
- after ?TIMEOUT -> throw({failed_to_receive_event, Type})
- end.
test_spawn() ->
{Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets
index 685b05b3dc..625fcd93a9 100644
--- a/test/config_schema_SUITE_data/rabbit.snippets
+++ b/test/config_schema_SUITE_data/rabbit.snippets
@@ -302,6 +302,45 @@ tcp_listen_options.exit_on_close = false",
{keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
{versions,['tlsv1.2','tlsv1.1']}]}]}],
[]},
+
+ {ssl_options_ciphers,
+ "listeners.ssl.1 = 5671
+ ssl_options.cacertfile = test/config_schema_SUITE_data/certs/cacert.pem
+ ssl_options.certfile = test/config_schema_SUITE_data/certs/cert.pem
+ ssl_options.keyfile = test/config_schema_SUITE_data/certs/key.pem
+ ssl_options.versions.1 = tlsv1.2
+ ssl_options.versions.2 = tlsv1.1
+ ssl_options.ciphers.1 = ECDHE-ECDSA-AES256-GCM-SHA384
+ ssl_options.ciphers.2 = ECDHE-RSA-AES256-GCM-SHA384
+ ssl_options.ciphers.3 = ECDHE-ECDSA-AES256-SHA384
+ ssl_options.ciphers.4 = ECDHE-RSA-AES256-SHA384
+ ssl_options.ciphers.5 = ECDH-ECDSA-AES256-GCM-SHA384
+ ssl_options.ciphers.6 = ECDH-RSA-AES256-GCM-SHA384
+ ssl_options.ciphers.7 = ECDH-ECDSA-AES256-SHA384
+ ssl_options.ciphers.8 = ECDH-RSA-AES256-SHA384
+ ssl_options.ciphers.9 = DHE-RSA-AES256-GCM-SHA384",
+ [{ssl,[{versions,['tlsv1.2','tlsv1.1']}]}],
+ [{ssl,[{versions,['tlsv1.2','tlsv1.1']}]},
+ {rabbit,
+ [{ssl_listeners,[5671]},
+ {ssl_options,
+ [{cacertfile,"test/config_schema_SUITE_data/certs/cacert.pem"},
+ {ciphers, [
+ "DHE-RSA-AES256-GCM-SHA384",
+ "ECDH-ECDSA-AES256-GCM-SHA384",
+ "ECDH-ECDSA-AES256-SHA384",
+ "ECDH-RSA-AES256-GCM-SHA384",
+ "ECDH-RSA-AES256-SHA384",
+ "ECDHE-ECDSA-AES256-GCM-SHA384",
+ "ECDHE-ECDSA-AES256-SHA384",
+ "ECDHE-RSA-AES256-GCM-SHA384",
+ "ECDHE-RSA-AES256-SHA384"
+ ]},
+ {certfile,"test/config_schema_SUITE_data/certs/cert.pem"},
+ {keyfile,"test/config_schema_SUITE_data/certs/key.pem"},
+ {versions,['tlsv1.2','tlsv1.1']}]}]}],
+ []},
+
{ssl_options_allow_poodle,
"listeners.ssl.1 = 5671
ssl_allow_poodle_attack = true
diff --git a/test/list_consumers_sanity_check_SUITE.erl b/test/list_consumers_sanity_check_SUITE.erl
index 8f2fb8e57f..3fa02b1de6 100644
--- a/test/list_consumers_sanity_check_SUITE.erl
+++ b/test/list_consumers_sanity_check_SUITE.erl
@@ -91,7 +91,7 @@ list_consumers_sanity_check(Config) ->
%% `rabbitmqctl report` shares some code with `list_consumers`, so
%% check that it also reports both channels
{ok, ReportStdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, A,
- ["list_consumers"]),
+ ["list_consumers", "--no-table-headers"]),
ReportLines = re:split(ReportStdOut, <<"\n">>, [trim]),
ReportCTags = [lists:nth(3, re:split(Row, <<"\t">>)) || <<"list_consumers_q", _/binary>> = Row <- ReportLines],
true = (lists:sort([CTag1, CTag2]) =:=
@@ -99,7 +99,7 @@ list_consumers_sanity_check(Config) ->
rabbitmqctl_list_consumers(Config, Node) ->
{ok, StdOut} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Node,
- ["list_consumers"]),
+ ["list_consumers", "--no-table-headers"]),
[<<"Listing consumers", _/binary>> | ConsumerRows] = re:split(StdOut, <<"\n">>, [trim]),
CTags = [ lists:nth(3, re:split(Row, <<"\t">>)) || Row <- ConsumerRows ],
CTags.
@@ -117,17 +117,17 @@ list_queues_online_and_offline(Config) ->
rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["stop"]),
GotUp = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "--online", "name"])),
+ ["list_queues", "--online", "name", "--no-table-headers"])),
ExpectUp = [[<<"q_a_1">>], [<<"q_a_2">>]],
ExpectUp = GotUp,
GotDown = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "--offline", "name"])),
+ ["list_queues", "--offline", "name", "--no-table-headers"])),
ExpectDown = [[<<"q_b_1">>], [<<"q_b_2">>]],
ExpectDown = GotDown,
GotAll = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "name"])),
+ ["list_queues", "name", "--no-table-headers"])),
ExpectAll = ExpectUp ++ ExpectDown,
ExpectAll = GotAll,
diff --git a/test/list_queues_online_and_offline_SUITE.erl b/test/list_queues_online_and_offline_SUITE.erl
index 96dc988006..4b56012a26 100644
--- a/test/list_queues_online_and_offline_SUITE.erl
+++ b/test/list_queues_online_and_offline_SUITE.erl
@@ -86,17 +86,17 @@ list_queues_online_and_offline(Config) ->
rabbit_ct_broker_helpers:rabbitmqctl(Config, B, ["stop"]),
GotUp = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "--online", "name"])),
+ ["list_queues", "--online", "name", "--no-table-headers"])),
ExpectUp = [[<<"q_a_1">>], [<<"q_a_2">>]],
ExpectUp = GotUp,
GotDown = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "--offline", "name"])),
+ ["list_queues", "--offline", "name", "--no-table-headers"])),
ExpectDown = [[<<"q_b_1">>], [<<"q_b_2">>]],
ExpectDown = GotDown,
GotAll = lists:sort(rabbit_ct_broker_helpers:rabbitmqctl_list(Config, A,
- ["list_queues", "name"])),
+ ["list_queues", "name", "--no-table-headers"])),
ExpectAll = ExpectUp ++ ExpectDown,
ExpectAll = GotAll,
diff --git a/test/policy_SUITE.erl b/test/policy_SUITE.erl
index 2c41433a30..7cf3427db0 100644
--- a/test/policy_SUITE.erl
+++ b/test/policy_SUITE.erl
@@ -147,7 +147,8 @@ operator_retroactive_policy_publish_ttl(Config) ->
%% the queue
publish(Ch, Q, lists:seq(1, 25)),
timer:sleep(50),
- [[<<"policy_ttl-queue">>, <<"75">>]] = rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues"]),
+ [[<<"policy_ttl-queue">>, <<"75">>]] =
+ rabbit_ct_broker_helpers:rabbitmqctl_list(Config, 0, ["list_queues", "--no-table-headers"]),
get_messages(50, Ch, Q),
delete(Ch, Q),
diff --git a/test/proxy_protocol_SUITE.erl b/test/proxy_protocol_SUITE.erl
index 136d2bb980..84b94d72a8 100644
--- a/test/proxy_protocol_SUITE.erl
+++ b/test/proxy_protocol_SUITE.erl
@@ -97,4 +97,4 @@ connection_name() ->
Pid = lists:nth(1, Pids),
{dictionary, Dict} = process_info(Pid, dictionary),
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
- ConnectionName. \ No newline at end of file
+ ConnectionName.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
new file mode 100644
index 0000000000..19f3a66a1d
--- /dev/null
+++ b/test/quorum_queue_SUITE.erl
@@ -0,0 +1,1812 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(quorum_queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, single_node},
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {single_node, [], all_tests()},
+ {unclustered, [], [
+ {cluster_size_2, [], [add_member]}
+ ]},
+ {clustered, [], [
+ {cluster_size_2, [], [cleanup_data_dir]},
+ {cluster_size_2, [], [add_member_not_running,
+ add_member_classic,
+ add_member_already_a_member,
+ add_member_not_found,
+ delete_member_not_running,
+ delete_member_classic,
+ delete_member_not_found,
+ delete_member]
+ ++ all_tests()},
+ {cluster_size_3, [], [
+ declare_during_node_down,
+ recover_from_single_failure,
+ recover_from_multiple_failures,
+ leadership_takeover,
+ delete_declare,
+ metrics_cleanup_on_leadership_takeover,
+ metrics_cleanup_on_leader_crash,
+ consume_in_minority
+ ]},
+ {cluster_size_5, [], [start_queue,
+ start_queue_concurrent,
+ quorum_cluster_size_3,
+ quorum_cluster_size_7
+ ]}
+ ]}
+ ].
+
+all_tests() ->
+ [
+ declare_args,
+ declare_invalid_args,
+ declare_invalid_properties,
+ start_queue,
+ stop_queue,
+ restart_queue,
+ restart_all_types,
+ stop_start_rabbit_app,
+ publish,
+ publish_and_restart,
+ consume,
+ consume_first_empty,
+ consume_from_empty_queue,
+ consume_and_autoack,
+ subscribe,
+ subscribe_with_autoack,
+ consume_and_ack,
+ consume_and_multiple_ack,
+ subscribe_and_ack,
+ subscribe_and_multiple_ack,
+ consume_and_requeue_nack,
+ consume_and_requeue_multiple_nack,
+ subscribe_and_requeue_nack,
+ subscribe_and_requeue_multiple_nack,
+ consume_and_nack,
+ consume_and_multiple_nack,
+ subscribe_and_nack,
+ subscribe_and_multiple_nack,
+ subscribe_should_fail_when_global_qos_true,
+ publisher_confirms,
+ publisher_confirms_with_deleted_queue,
+ dead_letter_to_classic_queue,
+ dead_letter_to_quorum_queue,
+ dead_letter_from_classic_to_quorum_queue,
+ cleanup_queue_state_on_channel_after_publish,
+ cleanup_queue_state_on_channel_after_subscribe,
+ basic_cancel,
+ purge,
+ sync_queue,
+ cancel_sync_queue,
+ basic_recover,
+ idempotent_recover,
+ vhost_with_quorum_queue_is_deleted
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(Group, Config) ->
+ ClusterSize = case Group of
+ single_node -> 1;
+ cluster_size_2 -> 2;
+ cluster_size_3 -> 3;
+ cluster_size_5 -> 5
+ end,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_queue_cleanup_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end.
+
+end_per_group(clustered, Config) ->
+ Config;
+end_per_group(unclustered, Config) ->
+ Config;
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_client_helpers:setup_steps()).
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+
+end_per_testcase(Testcase, Config) ->
+ catch delete_queues(),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+declare_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ assert_queue_type(Server, LQ, quorum),
+
+ DQ = <<"classic-declare-args-q">>,
+ declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
+ assert_queue_type(Server, DQ, classic),
+
+ DQ2 = <<"classic-q2">>,
+ declare(Ch, DQ2),
+ assert_queue_type(Server, DQ2, classic).
+
+declare_invalid_properties(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ LQ = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ auto_delete = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ exclusive = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ durable = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})).
+
+declare_invalid_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ LQ = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-message-ttl">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length-bytes">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-priority">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, long, 0}])).
+
+start_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Test declare with same arguments
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Test declare an existing queue with different arguments
+ ?assertExit(_, declare(Ch, LQ, [])),
+
+ %% Check that the application and process are still up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+
+start_queue_concurrent(Config) ->
+ Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ LQ = ?config(queue_name, Config),
+ Self = self(),
+ [begin
+ _ = spawn_link(fun () ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server),
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ,
+ [{<<"x-queue-type">>,
+ longstr,
+ <<"quorum">>}])),
+ Self ! {done, Server}
+ end)
+ end || Server <- Servers],
+
+ [begin
+ receive {done, Server} -> ok
+ after 5000 -> exit({await_done_timeout, Server})
+ end
+ end || Server <- Servers],
+
+
+ ok.
+
+quorum_cluster_size_3(Config) ->
+ quorum_cluster_size_x(Config, 3, 3).
+
+quorum_cluster_size_7(Config) ->
+ quorum_cluster_size_x(Config, 7, 5).
+
+quorum_cluster_size_x(Config, Max, Expected) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, long, Max}])),
+ {ok, Members, _} = ra:members({RaName, Server}),
+ ?assertEqual(Expected, length(Members)),
+ Info = rpc:call(Server, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ MembersQ = proplists:get_value(members, Info),
+ ?assertEqual(Expected, length(MembersQ)).
+
+stop_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+
+ %% Delete the quorum queue
+ ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
+restart_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+
+idempotent_recover(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% kill default vhost to trigger recovery
+ [{_, SupWrapperPid, _, _} | _] = rpc:call(Server, supervisor,
+ which_children,
+ [rabbit_vhost_sup_sup]),
+ [{_, Pid, _, _} | _] = rpc:call(Server, supervisor,
+ which_children,
+ [SupWrapperPid]),
+ %% kill the vhost process to trigger recover
+ rpc:call(Server, erlang, exit, [Pid, kill]),
+
+ timer:sleep(1000),
+ %% validate quorum queue is still functional
+ RaName = ra_name(LQ),
+ {ok, _, _} = ra:members({RaName, Server}),
+ %% validate vhosts are running - or rather validate that at least one
+ %% vhost per cluster is running
+ [begin
+ #{cluster_state := ServerStatuses} = maps:from_list(I),
+ ?assertMatch(#{Server := running}, maps:from_list(ServerStatuses))
+ end || I <- rpc:call(Server, rabbit_vhost,info_all, [])],
+ ok.
+
+vhost_with_quorum_queue_is_deleted(Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ VHost = <<"vhost2">>,
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ RaName = binary_to_atom(<<VHost/binary, "_", QName/binary>>, utf8),
+ User = ?config(rmq_username, Config),
+ ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost, User),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ UId = rpc:call(Node, ra_directory, where_is, [RaName]),
+ ?assert(UId =/= undefined),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
+ %% validate quorum queues got deleted
+ undefined = rpc:call(Node, ra_directory, where_is, [RaName]),
+ ok.
+
+restart_all_types(Config) ->
+ %% Test the node restart with both types of queues (quorum and classic) to
+ %% ensure there are no regressions
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ1 = <<"restart_all_types-qq1">>,
+ ?assertEqual({'queue.declare_ok', QQ1, 0, 0},
+ declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ QQ2 = <<"restart_all_types-qq2">>,
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ CQ1 = <<"restart_all_types-classic1">>,
+ ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ1, 1),
+ CQ2 = <<"restart_all_types-classic2">>,
+ ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ2, 1),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ %% Check that the application and two ra nodes are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ %% Check the classic queues restarted correctly
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
+ delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
+
+delete_queues(Ch, Queues) ->
+ [amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Queues].
+
+stop_start_rabbit_app(Config) ->
+ %% Test start/stop of rabbit app with both types of queues (quorum and
+ %% classic) to ensure there are no regressions
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ1 = <<"stop_start_rabbit_app-qq">>,
+ ?assertEqual({'queue.declare_ok', QQ1, 0, 0},
+ declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ QQ2 = <<"quorum-q2">>,
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ CQ1 = <<"stop_start_rabbit_app-classic">>,
+ ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ1, 1),
+ CQ2 = <<"stop_start_rabbit_app-classic2">>,
+ ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ2, 1),
+
+ rabbit_control_helper:command(stop_app, Server),
+ %% Check the ra application has stopped (thus its supervisor and queues)
+ ?assertMatch(false, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+
+ rabbit_control_helper:command(start_app, Server),
+
+ %% Check that the application and two ra nodes are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ %% Check the classic queues restarted correctly
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
+ delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
+
+publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ publish(Ch, QQ),
+ Name = ra_name(QQ),
+ wait_for_messages_ready(Servers, Name, 1),
+ wait_for_messages_pending_ack(Servers, Name, 0).
+
+ra_name(Q) ->
+ binary_to_atom(<<"%2F_", Q/binary>>, utf8).
+
+publish_and_restart(Config) ->
+ %% Test the node restart with both types of queues (quorum and classic) to
+ %% ensure there are no regressions
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ publish(rabbit_ct_client_helpers:open_channel(Config, Server), QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_first_empty(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ consume_empty(Ch, QQ, false),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, false),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+consume_in_minority(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false})).
+
+consume_and_autoack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, true),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_from_empty_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ consume_empty(Ch, QQ, false).
+
+subscribe(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_should_fail_when_global_qos_true(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ qos(Ch, 10, true),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ try subscribe(Ch, QQ, false) of
+ _ -> exit(subscribe_should_not_pass)
+ catch
+ _:_ = Err ->
+ ct:pal("Err ~p", [Err])
+ end,
+ ok.
+
+subscribe_with_autoack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, true),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_multiple_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_and_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+subscribe_and_multiple_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_and_requeue_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_requeue_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_and_requeue_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ receive_basic_deliver(true),
+ receive_basic_deliver(true),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end
+ end.
+
+subscribe_and_requeue_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end
+ end.
+
+subscribe_and_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+subscribe_and_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+publisher_confirms(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ ct:pal("WAIT FOR CONFIRMS ~n", []),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:unregister_confirm_handler(Ch),
+ ok.
+
+publisher_confirms_with_deleted_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ % subscribe(Ch, QQ, false),
+ publish(Ch, QQ),
+ delete_queues(Ch, [QQ]),
+ ct:pal("WAIT FOR CONFIRMS ~n", []),
+ amqp_channel:wait_for_confirms_or_die(Ch, 5000),
+ amqp_channel:unregister_confirm_handler(Ch),
+ ok.
+
+dead_letter_to_classic_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_to_classic_queue">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, CQ}
+ ])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, CQ, false).
+
+dead_letter_to_quorum_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ QQ2 = <<"dead_letter_to_quorum_queue-q2">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, QQ2}
+ ])),
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ RaName2 = ra_name(QQ2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName2, 0),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages_ready(Servers, RaName2, 0),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName2, 1),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ _ = consume(Ch, QQ2, false).
+
+dead_letter_from_classic_to_quorum_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = <<"classic-q-dead_letter_from_classic_to_quorum_queue">>,
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0},
+ declare(Ch, CQ, [{<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, QQ}
+ ])),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, CQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, CQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ _ = consume(Ch, QQ, false),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+cleanup_queue_state_on_channel_after_publish(Config) ->
+ %% Declare/delete the queue in one channel and publish on a different one,
+ %% to verify that the cleanup is propagated through channels
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch2, QQ),
+ Res = dirty_query(Servers, RaName, fun rabbit_fifo:query_consumer_count/1),
+ ct:pal ("Res ~p", [Res]),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName, 1),
+ [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
+ %% Check the channel state contains the state for the quorum queue on
+ %% channel 1 and 2
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 1),
+ %% then delete the queue and wait for the process to terminate
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children,
+ [ra_server_sup])
+ end),
+ %% Check that all queue states have been cleaned
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 0).
+
+cleanup_queue_state_on_channel_after_subscribe(Config) ->
+ %% Declare/delete the queue and publish in one channel, while consuming on a
+ %% different one to verify that the cleanup is propagated through channels
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch1, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch2, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end,
+ [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
+ %% Check the channel state contains the state for the quorum queue on channel 1 and 2
+ wait_for_cleanup(Server, NCh1, 1),
+ wait_for_cleanup(Server, NCh2, 1),
+ ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ %% Check that all queue states have been cleaned
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 0).
+
+recover_from_single_failure(Config) ->
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready([Server, Server1], RaName, 3),
+ wait_for_messages_pending_ack([Server, Server1], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+recover_from_multiple_failures(Config) ->
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+
+ %% there is an assumption here that the messages were not lost and were
+ %% recovered when a quorum was restored. Not the best test perhaps.
+ wait_for_messages_ready(Servers, RaName, 6),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+leadership_takeover(Config) ->
+ %% Kill nodes in succession forcing the takeover of leadership, and all messages that
+ %% are in the queue.
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ wait_for_messages_ready([Server2, Server], RaName, 3),
+ wait_for_messages_pending_ack([Server2, Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+metrics_cleanup_on_leadership_takeover(Config) ->
+ %% Queue core metrics should be deleted from a node once the leadership is transferred
+ %% to another follower
+ [Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ wait_until(
+ fun() ->
+ case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of
+ [{QRes, 3, 0, 3, _}] -> true;
+ _ -> false
+ end
+ end),
+ force_leader_change(Leader, Servers, QQ),
+ wait_until(fun () ->
+ [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
+ [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
+ end),
+ ok.
+
+metrics_cleanup_on_leader_crash(Config) ->
+ %% Queue core metrics should be deleted from a node once the leadership is transferred
+ %% to another follower
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+ {ok, _, {Name, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ wait_until(
+ fun() ->
+ case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of
+ [{QRes, 3, 0, 3, _}] -> true;
+ _ -> false
+ end
+ end),
+ Pid = rpc:call(Leader, erlang, whereis, [Name]),
+ rpc:call(Leader, erlang, exit, [Pid, kill]),
+ [Other | _] = lists:delete(Leader, Servers),
+ catch ra:trigger_election(Other),
+ %% kill it again just in case it came straight back up again
+ catch rpc:call(Leader, erlang, exit, [Pid, kill]),
+
+ %% this isn't a reliable test as the leader can be restarted so quickly
+ %% after a crash it is elected leader of the next term as well.
+ wait_until(
+ fun() ->
+ [] == rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes])
+ end),
+ ok.
+
+delete_declare(Config) ->
+ %% Delete cluster in ra is asynchronous, we have to ensure that we handle that in rmq
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ %% the actual data deletions happen after the call has returned as a quorum
+ %% queue leader waits for all nodes to confirm they replicated the poison
+ %% pill before terminating itself.
+ timer:sleep(1000),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Ensure that is a new queue and it's empty
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+basic_cancel(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+purge(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName, 0).
+
+sync_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {error, _, _} =
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QQ]),
+ ok.
+
+cancel_sync_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {error, _, _} =
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"cancel_sync_queue">>, QQ]),
+ ok.
+
+declare_during_node_down(Config) ->
+ [Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+
+ stop_node(Config, DownServer),
+ % rabbit_ct_broker_helpers:stop_node(Config, DownServer),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ timer:sleep(2000),
+ rabbit_ct_broker_helpers:start_node(Config, DownServer),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ ok.
+
+add_member_not_running(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ ct:pal("add_member_not_running config ~p", [Config]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, 'rabbit@burrow'])).
+
+add_member_classic(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, CQ, Server])).
+
+add_member_already_a_member(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, already_a_member},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server])).
+
+add_member_not_found(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({error, not_found},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server])).
+
+add_member(Config) ->
+ [Server0, Server1] = Servers0 =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server1])),
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server1])),
+ Info = rpc:call(Server0, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ Servers = lists:sort(Servers0),
+ ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
+
+delete_member_not_running(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, 'rabbit@burrow'])).
+
+delete_member_classic(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, CQ, Server])).
+
+delete_member_not_found(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({error, not_found},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])).
+
+delete_member(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+ ?assertEqual(ok,
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])),
+ ?assertEqual({error, not_a_member},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])).
+
+cleanup_data_dir(Config) ->
+ %% This test is slow, but also checks that we handle properly errors when
+ %% trying to delete a queue in minority. A case clause there had gone
+ %% previously unnoticed.
+
+ [Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server1),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+
+ [{_, UId}] = rpc:call(Server1, ra_directory, list_registered, []),
+ DataDir = rpc:call(Server1, ra_env, server_data_dir, [UId]),
+ ?assert(filelib:is_dir(DataDir)),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ?assertExit({{shutdown,
+ {connection_closing, {server_initiated_close, 541, _}}}, _},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ ?assert(filelib:is_dir(DataDir)),
+
+ ?assertEqual(ok,
+ rpc:call(Server1, rabbit_quorum_queue, cleanup_data_dir,
+ [])),
+ ?assert(not filelib:is_dir(DataDir)).
+
+basic_recover(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+%%----------------------------------------------------------------------------
+
+declare(Ch, Q) ->
+ declare(Ch, Q, []).
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+
+assert_queue_type(Server, Q, Expected) ->
+ Actual = get_queue_type(Server, Q),
+ Expected = Actual.
+
+get_queue_type(Server, Q) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
+ {ok, AMQQueue} =
+ rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ AMQQueue#amqqueue.type.
+
+wait_for_messages(Config, Stats) ->
+ wait_for_messages(Config, lists:sort(Stats), 60).
+
+wait_for_messages(Config, Stats, 0) ->
+ ?assertEqual(Stats,
+ lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))));
+wait_for_messages(Config, Stats, N) ->
+ case lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))) of
+ Stats0 when Stats0 == Stats ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Config, Stats, N - 1)
+ end.
+
+filter_queues(Expected, Got) ->
+ Keys = [K || [K, _, _, _] <- Expected],
+ lists:filter(fun([K, _, _, _]) ->
+ lists:member(K, Keys)
+ end, Got).
+
+publish(Ch, Queue) ->
+ ok = amqp_channel:call(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg">>}).
+
+consume(Ch, Queue, NoAck) ->
+ {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = NoAck}),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, Reply),
+ GetOk#'basic.get_ok'.delivery_tag.
+
+consume_empty(Ch, Queue, NoAck) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = NoAck})).
+
+subscribe(Ch, Queue, NoAck) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+qos(Ch, Prefetch, Global) ->
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = Global,
+ prefetch_count = Prefetch})).
+
+receive_basic_deliver(Redelivered) ->
+ receive
+ {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
+ ok
+ end.
+
+wait_for_cleanup(Server, Channel, Number) ->
+ wait_for_cleanup(Server, Channel, Number, 60).
+
+wait_for_cleanup(Server, Channel, Number, 0) ->
+ ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+wait_for_cleanup(Server, Channel, Number, N) ->
+ case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
+ Length when Number == Length ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_cleanup(Server, Channel, Number, N - 1)
+ end.
+
+
+wait_for_messages_ready(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_ready/1, 60).
+
+wait_for_messages_pending_ack(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_checked_out/1, 60).
+
+wait_for_messages(Servers, QName, Number, Fun, 0) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ Totals = lists:map(fun(M) when is_map(M) ->
+ maps:size(M);
+ (_) ->
+ -1
+ end, Msgs),
+ ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+wait_for_messages(Servers, QName, Number, Fun, N) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ case lists:all(fun(M) when is_map(M) ->
+ maps:size(M) == Number;
+ (_) ->
+ false
+ end, Msgs) of
+ true ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Servers, QName, Number, Fun, N - 1)
+ end.
+
+dirty_query(Servers, QName, Fun) ->
+ lists:map(
+ fun(N) ->
+ case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
+ {ok, {_, Msgs}, _} ->
+ Msgs;
+ _ ->
+ undefined
+ end
+ end, Servers).
+
+wait_until(Condition) ->
+ wait_until(Condition, 60).
+
+wait_until(Condition, 0) ->
+ ?assertEqual(true, Condition());
+wait_until(Condition, N) ->
+ case Condition() of
+ true ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_until(Condition, N - 1)
+ end.
+
+force_leader_change(Leader, Servers, Q) ->
+ RaName = ra_name(Q),
+ [F1, _] = Servers -- [Leader],
+ ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
+ case ra:members({RaName, Leader}) of
+ {ok, _, {_, Leader}} ->
+ %% Leader has been re-elected
+ force_leader_change(Leader, Servers, Q);
+ {ok, _, _} ->
+ %% Leader has changed
+ ok
+ end.
+
+delete_queues() ->
+ [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+stop_node(Config, Server) ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
new file mode 100644
index 0000000000..a2e22afc2e
--- /dev/null
+++ b/test/rabbit_fifo_SUITE.erl
@@ -0,0 +1,624 @@
+-module(rabbit_fifo_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+all_tests() ->
+ [
+ basics,
+ return,
+ rabbit_fifo_returns_correlation,
+ resends_lost_command,
+ returns_after_down,
+ resends_after_lost_applied,
+ handles_reject_notification,
+ two_quick_enqueues,
+ detects_lost_delivery,
+ dequeue,
+ discard,
+ cancel_checkout,
+ credit,
+ untracked_enqueue,
+ flow,
+ test_queries,
+ duplicate_delivery,
+ usage
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_group(_, Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ _ = application:load(ra),
+ ok = application:set_env(ra, data_dir, PrivDir),
+ application:ensure_all_started(ra),
+ application:ensure_all_started(lg),
+ Config.
+
+end_per_group(_, Config) ->
+ _ = application:stop(ra),
+ Config.
+
+init_per_testcase(TestCase, Config) ->
+ ra_server_sup:remove_all(),
+ ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
+ ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
+ [
+ {cluster_name, TestCase},
+ {uid, atom_to_binary(TestCase, utf8)},
+ {node_id, {TestCase, node()}},
+ {uid2, atom_to_binary(ServerName2, utf8)},
+ {node_id2, {ServerName2, node()}},
+ {uid3, atom_to_binary(ServerName3, utf8)},
+ {node_id3, {ServerName3, node()}}
+ | Config].
+
+basics(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ CustomerTag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0),
+
+ ra_log_wal:force_roll_over(ra_log_wal),
+ % create segment the segment will trigger a snapshot
+ timer:sleep(1000),
+
+ {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1),
+ % process ra events
+ FState3 = process_ra_event(FState2, 250),
+
+ FState5 = receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
+ {internal, _AcceptedSeqs, _Actions, _FState4} ->
+ exit(unexpected_internal_event);
+ {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
+ {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
+ FState4),
+ S
+ end
+ after 5000 ->
+ exit(await_msg_timeout)
+ end,
+
+ % process settle applied notificaiton
+ FState5b = process_ra_event(FState5, 250),
+ _ = ra:stop_server(ServerId),
+ _ = ra:restart_server(ServerId),
+
+ % give time to become leader
+ timer:sleep(500),
+ {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
+ % process applied event
+ FState6b = process_ra_event(FState6, 250),
+
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
+ {internal, _, _, _FState7} ->
+ ct:pal("unexpected event ~p~n", [E]),
+ exit({unexpected_internal_event, E});
+ {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
+ {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
+ ok
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+return(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ ok = start_cluster(ClusterName, [ServerId, ServerId2]),
+
+ F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]),
+ {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
+ {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
+ {_, _, F2} = process_ra_events(F1, 100),
+ {ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
+
+ ra:stop_server(ServerId),
+ ok.
+
+rabbit_fifo_returns_correlation(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0),
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
+ {internal, [corr1], [], _F2} ->
+ ok;
+ {Del, _} ->
+ exit({unexpected, Del})
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+duplicate_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ Fun = fun Loop(S0) ->
+ receive
+ {ra_event, Frm, E} = Evt ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
+ {internal, [corr1], [], S1} ->
+ Loop(S1);
+ {_Del, S1} ->
+ %% repeat event delivery
+ self() ! Evt,
+ %% check that then next received delivery doesn't
+ %% repeat or crash
+ receive
+ {ra_event, F, E1} ->
+ case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
+ {internal, [], [], S2} ->
+ S2
+ end
+ end
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end
+ end,
+ Fun(F2),
+ ra:stop_server(ServerId),
+ ok.
+
+usage(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
+ {_, _, _} = process_ra_events(F3, 50),
+ % force tick and usage stats emission
+ ServerId ! tick_timeout,
+ timer:sleep(50),
+ % ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]),
+ Use = rabbit_fifo:usage(element(1, ServerId)),
+ ct:pal("Use ~w~n", [Use]),
+ ra:stop_server(ServerId),
+ ?assert(Use > 0.0),
+ ok.
+
+resends_lost_command(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = meck:new(ra, [passthrough]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ % lose the enqueue
+ meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ meck:unload(ra),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+two_quick_enqueues(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ _ = process_ra_events(F2, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+detects_lost_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
+ {_, _, F0} = process_ra_events(F00, 100),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ % lose first delivery
+ receive
+ {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
+ ok
+ after 500 ->
+ exit(await_delivery_timeout)
+ end,
+
+ % assert three deliveries were received
+ {[_, _, _], _, _} = process_ra_events(F3, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+returns_after_down(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ {_, _, F2} = process_ra_events(F1, 500),
+ % start a customer in a separate processes
+ % that exits after checkout
+ Self = self(),
+ _Pid = spawn(fun () ->
+ F = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F),
+ Self ! checkout_done
+ end),
+ receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ % message should be available for dequeue
+ {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ ra:stop_server(ServerId),
+ ok.
+
+resends_after_lost_applied(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ 500),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ % lose an applied event
+ receive
+ {ra_event, _, {applied, _}} ->
+ ok
+ after 500 ->
+ exit(await_ra_event_timeout)
+ end,
+ % send another message
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+handles_reject_notification(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId1 = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ UId1 = ?config(uid, Config),
+ CId = {UId1, self()},
+
+ ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
+ _ = ra:process_command(ServerId1, {checkout,
+ {auto, 10, simple_prefetch}, CId}),
+ % reverse order - should try the first node in the list first
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
+ {ok, F1} = rabbit_fifo_client:enqueue(one, F0),
+
+ timer:sleep(500),
+
+ % the applied notification
+ _F2 = process_ra_event(F1, 250),
+ ra:stop_server(ServerId1),
+ ra:stop_server(ServerId2),
+ ok.
+
+discard(Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ ClusterName = ?config(cluster_name, Config),
+ Conf = #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{data_dir => PrivDir, uid => UId},
+ initial_member => [],
+ machine => {module, rabbit_fifo,
+ #{dead_letter_handler =>
+ {?MODULE, dead_letter_handler, [self()]}}}},
+ _ = ra:start_server(Conf),
+ ok = ra:trigger_election(ServerId),
+ _ = ra:members(ServerId),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
+ F3 = discard_next_delivery(F2, 500),
+ {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
+ receive
+ {dead_letter, Letters} ->
+ ct:pal("dead letters ~p~n", [Letters]),
+ [{_, msg1}] = Letters,
+ ok
+ after 500 ->
+ exit(dead_letter_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+cancel_checkout(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1),
+ {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
+ {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
+ {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ ok.
+
+credit(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {_, _, F3} = process_ra_events(F2, [], 250),
+ %% checkout with 0 prefetch
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3),
+ %% assert no deliveries
+ {_, _, F5} = process_ra_events0(F4, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% provide some credit
+ {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
+ {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
+ process_ra_events(F6, [], 250),
+
+ %% credit and drain
+ {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
+ {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
+ process_ra_events(F8, [], 250),
+ flush(),
+
+ %% enqueue another message - at this point the consumer credit should be
+ %% all used up due to the drain
+ {ok, F10} = rabbit_fifo_client:enqueue(m3, F9),
+ %% assert no deliveries
+ {_, _, F11} = process_ra_events0(F10, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% credit again and receive the last message
+ {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
+ {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250),
+ ok.
+
+untracked_enqueue(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
+ timer:sleep(100),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ ra:stop_server(ServerId),
+ ok.
+
+
+flow(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(m3, F2),
+ {slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
+ {_, _, F5} = process_ra_events(F4, 500),
+ {ok, _} = rabbit_fifo_client:enqueue(m5, F5),
+ ra:stop_server(ServerId),
+ ok.
+
+test_queries(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ P = spawn(fun () ->
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ process_ra_events(F2, 100),
+ receive stop -> ok end
+ end),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0),
+ {ok, {_, Ready}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, maps:size(Ready)),
+ ct:pal("Ready ~w~n", [Ready]),
+ {ok, {_, Checked}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, maps:size(Checked)),
+ ct:pal("Checked ~w~n", [Checked]),
+ {ok, {_, Processes}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_processes/1),
+ ct:pal("Processes ~w~n", [Processes]),
+ ?assertEqual(2, length(Processes)),
+ P ! stop,
+ ra:stop_server(ServerId),
+ ok.
+
+dead_letter_handler(Pid, Msgs) ->
+ Pid ! {dead_letter, Msgs}.
+
+dequeue(Config) ->
+ ClusterName = ?config(priv_dir, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ Tag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
+ {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
+ {_, _, F2} = process_ra_events(F2_, 100),
+
+ {ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
+ {_, _, F4} = process_ra_events(F4_, 100),
+ {ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
+ ra:stop_server(ServerId),
+ ok.
+
+enq_deq_n(N, F0) ->
+ enq_deq_n(N, F0, []).
+
+enq_deq_n(0, F0, Acc) ->
+ {_, _, F} = process_ra_events(F0, 100),
+ {F, Acc};
+enq_deq_n(N, F, Acc) ->
+ {ok, F1} = rabbit_fifo_client:enqueue(N, F),
+ {_, _, F2} = process_ra_events(F1, 10),
+ {ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
+
+ {_, _, F4} = process_ra_events(F3, 5),
+ enq_deq_n(N-1, F4, [Deq | Acc]).
+
+conf(ClusterName, UId, ServerId, _, Peers) ->
+ #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{uid => UId},
+ initial_members => Peers,
+ machine => {module, rabbit_fifo, #{}}}.
+
+process_ra_event(State, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ % ct:pal("processed ra event ~p~n", [Evt]),
+ {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ S
+ after Wait ->
+ exit(ra_event_timeout)
+ end.
+
+process_ra_events(State0, Wait) ->
+ process_ra_events(State0, [], Wait).
+
+process_ra_events(State, Acc, Wait) ->
+ DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
+ S2
+ end,
+ process_ra_events0(State, Acc, [], Wait, DeliveryFun).
+
+process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
+ receive
+ {ra_event, From, Evt} ->
+ % ct:pal("ra event ~w~n", [Evt]),
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, Actions, State} ->
+ process_ra_events0(State, Acc, Actions0 ++ Actions,
+ Wait, DeliveryFun);
+ {{delivery, _Tag, Msgs} = Del, State1} ->
+ State = DeliveryFun(Del, State1),
+ process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun);
+ eol ->
+ eol
+ end
+ after Wait ->
+ {Acc, Actions0, State0}
+ end.
+
+discard_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _Actions, State} ->
+ discard_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ ct:pal("discarding ~p", [Msgs]),
+ {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+return_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _, State} ->
+ return_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ ct:pal("returning ~p", [Msgs]),
+ {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+validate_process_down(Name, 0) ->
+ exit({process_not_down, Name});
+validate_process_down(Name, Num) ->
+ case whereis(Name) of
+ undefined ->
+ ok;
+ _ ->
+ timer:sleep(100),
+ validate_process_down(Name, Num-1)
+ end.
+
+start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
+ {ok, Started, _} = ra:start_cluster(ClusterName,
+ {module, rabbit_fifo, RaFifoConfig},
+ ServerIds),
+ ?assertEqual(length(Started), length(ServerIds)),
+ ok.
+
+start_cluster(ClusterName, ServerIds) ->
+ start_cluster(ClusterName, ServerIds, #{}).
+
+flush() ->
+ receive
+ Msg ->
+ ct:pal("flushed: ~w~n", [Msg]),
+ flush()
+ after 10 ->
+ ok
+ end.
diff --git a/test/rabbitmqctl_integration_SUITE.erl b/test/rabbitmqctl_integration_SUITE.erl
index 71b74ea104..a5d3a0fd03 100644
--- a/test/rabbitmqctl_integration_SUITE.erl
+++ b/test/rabbitmqctl_integration_SUITE.erl
@@ -151,7 +151,7 @@ list_queues_stopped(Config) ->
ListedQueues =
[ {Name, State}
|| [Name, State] <- rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "state"]) ],
+ Config, 0, ["list_queues", "name", "state", "--no-table-headers"]) ],
[ <<"running">> = proplists:get_value(Q, ListedQueues) || Q <- Node1Queues ],
%% Node is running. Vhost is down
@@ -176,4 +176,4 @@ assert_ctl_queues(Config, Node, Args, Expected0) ->
end.
run_list_queues(Config, Node, Args) ->
- rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name"]). \ No newline at end of file
+ rabbit_ct_broker_helpers:rabbitmqctl_list(Config, Node, ["list_queues"] ++ Args ++ ["name", "--no-table-headers"]).