summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2018-10-29 09:47:29 +0000
committerGitHub <noreply@github.com>2018-10-29 09:47:29 +0000
commit3f3702514e37f965daee2605a6bf4428cad3dc28 (patch)
treeee905ee59714d45ca1ce6cc1791ec35e043718e8 /test
parent8ed6b4477f41085773934bf18f7acfb9929a5411 (diff)
downloadrabbitmq-server-git-3f3702514e37f965daee2605a6bf4428cad3dc28.tar.gz
Quorum queues (#1706)
* Test queue.declare method with quorum type [#154472130] * Cosmetics [#154472130] * Start quorum queue Includes ra as a rabbit dependency [#154472152] * Update info and list operations to use quorum queues Basic implementation. Might need an update when more functionality is added to the quorum queues. [#154472152] * Stop quorum queue [#154472158] * Restart quorum queue [#154472164] * Introduce UId in ra config to support newer version of ra Improved ra stop [#154472158] * Put data inside VHost specific subdirs [#154472164] * Include ra in rabbit deps to support stop_app/start_app command [#154472164] * Stop quorum queues in `rabbit_amqqueue:stop/1` [#154472158] * Revert creation of fifo ets table inside rabbit Now supported by ra [#154472158] * Filter quorum queues [#154472158] * Test restart node with quorum queues [#154472164] * Publish to quorum queues [#154472174] * Use `ra:restart_node/1` [#154472164] * Wait for stats to be published when querying quorum queues [#154472174] * Test publish and queue length after restart [#154472174] * Consume messages from quorum queues with basic.get [#154472211] * Autoack messages from quorum queues on basic.get [#154472211] * Fix no_ack meaning no_ack = true is equivalent to autoack [#154472211] * Use data_dir as provided in the config If we modify the data_dir, ra is not able to delete the data when a queue is deleted [#154472158] * Remove unused code/variables [#154472158] * Subscribe to a quorum queue Supports auto-ack [#154472215] * Ack messages consumed from quorum queues [#154472221] * Nack messages consumed from quorum queues [#154804608] * Use delivery tag as consumer tag for basic.get in quorum queues [#154472221] * Support for publisher confirms in quorum queues [#154472198] * Integrate with ra_fifo_client * Clear queue state on queue.delete [#154472158] * Fix quorum nack [#154804608] * Test redelivery after nack [#154804608] * Nack without requeueing [#154472225] * Test multiple acks [#154804208] * Test multiple nacks [#154804314] * Configure dead letter exchange with queue declare [#155076661] * Use a per-vhost process to handle dead-lettering Needs to hold state for quorum queues [#155401802] * Implement dead-lettering on nack'ed messages [#154804620] * Use queue name as a resource on message delivery Fixes a previously introduced bug [#154804608] * Handle ra events on dead letter process [#155401802] * Pass empty queue states to queue delete Queue deletion on vhost deletion calls directly to rabbit_amqqueue. Queue states are not available, but we can provide an empty map as in deletion the states are only needed for cleanup. * Generate quorum queue stats and events Consumer delete events are still pending, as depend on basic.cancel (not implemented yet), ra terminating or ra detecting channel down [#154472241] * Ensure quorum mapping entries are available before metric emission [#154472241] * Configure data_dir, uses new RABBITMQ_QUORUM_BASE env var [#154472152] * Use untracked enqueues when sending wihtout channel Updated several other calls missed during the quorum implementation * Revert "Configure data_dir, uses new RABBITMQ_QUORUM_BASE env var" This reverts commit f2261212410affecb238fcbd1fb451381aee4036. * Configure data_dir, uses new RABBITMQ_QUORUM_DIR based on mnesia dir [#154472152] * Fix get_quorum_state * Fix calculation of quorum pids * Move all quorum queues code to its own module [#154472241] * Return an error when declaring a quorum queue with an incompatible argument [#154521696] * Cleanup of quorum queue state after queue delete Also fixes some existing problems where the state wasn't properly stored [#155458625] * Revert Revert "Declare a quorum queue using the queue.declare method" * Remove duplicated state info [#154472241] * Start/stop multi-node quorum queue [#154472231] [#154472236] * Restart nodes in a multi-node quorum cluster [#154472238] * Test restart and leadership takeover on multiple nodes [#154472238] * Wait for leader down after deleting a quorum cluster It ensures an smooth delete-declare sequence without race conditions. The test included here detected the situation before the fix. [#154472236] * Populate quorum_mapping from mnesia when not available Ensures that leader nodes that don't have direct requests can get the mapping ra name -> queue name * Cosmetics * Do not emit core metrics if queue has just been deleted * Use rabbit_mnesia:is_process_alive Fixes bug introduced by cac9583e1bb2705be7f06c2ab7f416a75d11c875 [#154472231] * Only try to report stats if quorum process is alive * Implement cancel consumer callback Deletes metrics and sends consumer deleted event * Remove unnecessary trigger election call ra:restart_node has already been called during the recovery * Apply cancellation callback on node hosting the channel * Cosmetics * Read new fifo metrics which store directly total, ready and unack * Implement basic.cancel for quorum queues * Store leader in amqqueue record, report all in stats [#154472407] * Declare quorum queue in mnesia before starting the ra cluster Record needs to be stored first to update the leader on ra effects * Revert * Purge quorum queues [#154472182] * Improve use of untracked_enqueue Choose the persisted leader id instead of just using the id of the leader at point of creation. * Store quorum leader in the pid field of amqqueue record Same as mirrored queues, no real need for an additional field * Improve recovery When a ra node has never been started on a rabbit node ensure it doesn't fail but instead rebuilds the config and starts the node as a new node. Also fix issue when a quorum queue is declared when one of it's rabbit nodes are unavailable. [#157054606] * Cleanup core metrics after leader change [#157054473] * Return an error on sync_queue on quorum queues [#154472334] * Return an error on cancel_sync_queue on quorum queues [#154472337] * Fix basic_cancel and basic_consume return values Ensure the quorum queue state is always returned by these functions. * Restore arity of amqqeueu delete and purge functions. This avoids some breaking changes in the cli. * Fix bug returning consumers. * remove rogue debug log * Integrate ingress flow control with quorum queues [#157000583] * Configure commands soft limit [#157000583] * Support quorum pids on rabbit_mnesia:is_process_alive * Publish consumers metric for quorum queues * Whitelist quorum directory in is_virgin_node Allow the quorum directoy to exist without affecting the status of the Rabbit node. * Delete queue_metrics on leader change. Also run the become_leader handler in a separate process to avoid blocking. [#157424225] * Report cluster status in quorum queue infos. New per node status command. Related to [#157146500] * Remove quorum_mapping table As we can store the full queue name resource as the cluster id of the ra_fifo_client state we can avoid needed the quorum_mapping table. * Fix xref issue * Provide quorum members information in stats [#157146500] * fix unused variable * quorum queue multiple declare handling Extend rabbit_amqqueue:internal_declare/2 to indicate if the queue record was created or exisiting. From this we can then provide a code path that should handle concurrent queue declares of the same quorum queue. * Return an error when declaring exclusive/auto-delete quorum queue [#157472160] * Restore lost changes from 79c9bd201e1eac006a42bd162e7c86df96496629 * recover another part of commit * fixup cherry pick * Ra io/file metrics handler and stats publishing [#157193081] * Revert "Ra io/file metrics handler and stats publishing" This reverts commit 05d15c786540322583fc655709825db215b70952. * Do not issue confirms on node down for quorum queues. Only a ra_event should be used to issue positive confirms for a quorum queue. * Ra stats publishing [#157193081] * Pick consumer utilisation from ra data [#155402726] * Handle error when deleting a quorum queue and all nodes are already down This is in fact a successful deletion as all raft nodes are already 'stopped' [#158656366] * Return an error when declaring non-durable quorum queues [#158656454] * Rename dirty_query to committed_query * Delete stats on leader node [#158661152] * Give full list of nodes to fifo client * Handle timeout in quorum basic_get * Fix unused variable error * Handle timeout in basic get [#158656366] * Force GC after purge [#158789389] * Increase `ra:delete_cluster` timeout to 120s * Revert "Force GC after purge" This reverts commit 5c98bf22994eb39004760799d3a2c5041d16e9d4. * Add quorum member command [#157481599] * Delete quorum member command [#157481599] * Implement basic.recover for quorum queues [#157597411] * Change concumer utilisation to use the new ra_fifo table and api. * Set max quorum queue size limit Defaults to 7, can be configured per queue on queue.declare Nodes are selected randomly from the list of nodes, but the one that is executing the queue.declare command [#159338081] * remove potentially unrelated changes to rabbit_networking * Move ra_fifo to rabbit Copied ra_fifo to rabbit and renamed it rabbit_fifo. [#159338031] * rabbit_fifo tidy up * rabbit_fifo tidy up * rabbit_fifo: customer -> consumer rename * Move ra_fifo tests [#159338031] * Tweak quorum_queue defaults * quorum_queue test reliability * Optimise quorum_queue test suite. By only starting a rabbit cluster per group rather than test. [#160612638] * Renamings in line with ra API changes * rabbit_fifo fixes * Update with ra API changes Ra has consolidated and simplified it's api. These changes update to confirm to that. * Update rabbit_fifo with latest ra changes * Clean up out of date comment * Return map of states * Add test case for basic.get on an empty queue Before the previous patch, any subsequent basic.get would crash as the map of states had been replaced by a single state. * Clarify use of deliver tags on record_sent * Clean up queues after testcase * Remove erlang monitor of quorum queues in rabbit_channel The eol event can be used instead * Use macros to make clearer distinctions between quorum/classic queues Cosmetic only * Erase queue stats on 'eol' event * Update to follow Ra's cluster_id -> cluster_name rename. * Rename qourum-cluster-size To quorum-initial-group-size * Issue confirms on quorum queue eol Also avoid creating quorum queue session state on queue operation methods. * Only classic queues should be notified on channel down * Quorum queues do not support global qos Exit with protocol error of a basic.consume for a quorum queue is issued on a channel with global qos enabled. * unused variable name * Refactoring Strictly enfornce that channels do not monitor quorum queues. * Refactor foreach_per_queue in the channel. To make it call classic and quorum queues the same way. [#161314899] * rename function * Query classic and quorum queues separately during recovery as they should not be marked as stopped during failed vhost recovery. * Remove force_event_refresh function As the only user of this function, the management API no longer requires it. * fix errors * Remove created_at from amqqueue record [#161343680] * rabbit_fifo: support AMQP 1.0 consumer credit This change implements an alternative consumer credit mechanism similar to AMQP 1.0 link credit where the credit (prefetch) isn't automatically topped up as deliveries are settled and instead needs to be manually increased using a credit command. This is to be integrated with the AMQP 1.0 plugin. [#161256187] * Add basic.credit support for quorum queues. Added support for AMQP 1.0 transfer flow control. [#161256187] * Make quorum queue recover idempotent So that if a vhost crashes and runs the recover steps it doesn't fail because ra servers are still running. [#161343651] * Add tests for vhost deletion To ensure quorum queues are cleaned up on vhost removal. Also fix xref issue. [#161343673] * remove unused clause * always return latest value of queue * Add rabbitmq-queues scripts. Remove ra config from .bat scripts. * Return error if trying to get quorum status of a classic queue.
Diffstat (limited to 'test')
-rw-r--r--test/backing_queue_SUITE.erl7
-rw-r--r--test/cluster_SUITE.erl44
-rw-r--r--test/proxy_protocol_SUITE.erl2
-rw-r--r--test/quorum_queue_SUITE.erl1783
-rw-r--r--test/rabbit_fifo_SUITE.erl624
5 files changed, 2414 insertions, 46 deletions
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 60f86e0542..94cbd48e8c 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -701,7 +701,9 @@ bq_variable_queue_delete_msg_store_files_callback1(Config) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
- rabbit_amqqueue:basic_get(Q, self(), true, Limiter),
+ rabbit_amqqueue:basic_get(Q, self(), true, Limiter,
+ <<"bq_variable_queue_delete_msg_store_files_callback1">>,
+ #{}),
{ok, CountMinusOne} = rabbit_amqqueue:purge(Q),
%% give the queue a second to receive the close_fds callback msg
@@ -737,7 +739,8 @@ bq_queue_recover1(Config) ->
fun (Q1 = #amqqueue { pid = QPid1 }) ->
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
- rabbit_amqqueue:basic_get(Q1, self(), false, Limiter),
+ rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
+ <<"bq_queue_recover1">>, #{}),
exit(QPid1, shutdown),
VQ1 = variable_queue_init(Q, true),
{{_Msg1, true, _AckTag1}, VQ2} =
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index 4864989b6a..62928aae9f 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -29,8 +29,7 @@
delegates_async,
delegates_sync,
queue_cleanup,
- declare_on_dead_queue,
- refresh_events
+ declare_on_dead_queue
]).
all() ->
@@ -240,34 +239,6 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
after ?TIMEOUT -> throw(failed_to_create_and_kill_queue)
end.
-refresh_events(Config) ->
- {I, J} = ?config(test_direction, Config),
- From = rabbit_ct_broker_helpers:get_node_config(Config, I, nodename),
- To = rabbit_ct_broker_helpers:get_node_config(Config, J, nodename),
- rabbit_ct_broker_helpers:add_code_path_to_node(To, ?MODULE),
- passed = rabbit_ct_broker_helpers:rpc(Config,
- From, ?MODULE, refresh_events1, [Config, To]).
-
-refresh_events1(Config, SecondaryNode) ->
- dummy_event_receiver:start(self(), [node(), SecondaryNode],
- [channel_created, queue_created]),
-
- {_Writer, Ch} = test_spawn(),
- expect_events(pid, Ch, channel_created),
- rabbit_channel:shutdown(Ch),
-
- {_Writer2, Ch2} = test_spawn(SecondaryNode),
- expect_events(pid, Ch2, channel_created),
- rabbit_channel:shutdown(Ch2),
-
- {new, #amqqueue{name = QName} = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"refresh_events-q">>),
- false, false, [], none, <<"acting-user">>),
- expect_events(name, QName, queue_created),
- rabbit_amqqueue:delete(Q, false, false, <<"acting-user">>),
-
- dummy_event_receiver:stop(),
- passed.
make_responder(FMsg) -> make_responder(FMsg, timeout).
make_responder(FMsg, Throw) ->
@@ -307,19 +278,6 @@ dead_queue_loop(QueueName, OldPid) ->
Q
end.
-expect_events(Tag, Key, Type) ->
- expect_event(Tag, Key, Type),
- rabbit:force_event_refresh(make_ref()),
- expect_event(Tag, Key, Type).
-
-expect_event(Tag, Key, Type) ->
- receive #event{type = Type, props = Props} ->
- case rabbit_misc:pget(Tag, Props) of
- Key -> ok;
- _ -> expect_event(Tag, Key, Type)
- end
- after ?TIMEOUT -> throw({failed_to_receive_event, Type})
- end.
test_spawn() ->
{Writer, _Limiter, Ch} = rabbit_ct_broker_helpers:test_channel(),
diff --git a/test/proxy_protocol_SUITE.erl b/test/proxy_protocol_SUITE.erl
index 136d2bb980..84b94d72a8 100644
--- a/test/proxy_protocol_SUITE.erl
+++ b/test/proxy_protocol_SUITE.erl
@@ -97,4 +97,4 @@ connection_name() ->
Pid = lists:nth(1, Pids),
{dictionary, Dict} = process_info(Pid, dictionary),
{process_name, {rabbit_reader, ConnectionName}} = lists:keyfind(process_name, 1, Dict),
- ConnectionName. \ No newline at end of file
+ ConnectionName.
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
new file mode 100644
index 0000000000..d983a9d396
--- /dev/null
+++ b/test/quorum_queue_SUITE.erl
@@ -0,0 +1,1783 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(quorum_queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, single_node},
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {single_node, [], all_tests()},
+ {unclustered, [], [
+ {cluster_size_2, [], [add_member]}
+ ]},
+ {clustered, [], [
+ {cluster_size_2, [], [add_member_not_running,
+ add_member_classic,
+ add_member_already_a_member,
+ add_member_not_found,
+ delete_member_not_running,
+ delete_member_classic,
+ delete_member_not_found,
+ delete_member]
+ ++ all_tests()},
+ {cluster_size_3, [], [
+ declare_during_node_down,
+ recover_from_single_failure,
+ recover_from_multiple_failures,
+ leadership_takeover,
+ delete_declare,
+ metrics_cleanup_on_leadership_takeover,
+ metrics_cleanup_on_leader_crash,
+ consume_in_minority
+ ]},
+ {cluster_size_5, [], [start_queue,
+ start_queue_concurrent,
+ quorum_cluster_size_3,
+ quorum_cluster_size_7
+ ]}
+ ]}
+ ].
+
+all_tests() ->
+ [
+ declare_args,
+ declare_invalid_args,
+ declare_invalid_properties,
+ start_queue,
+ stop_queue,
+ restart_queue,
+ restart_all_types,
+ stop_start_rabbit_app,
+ publish,
+ publish_and_restart,
+ consume,
+ consume_first_empty,
+ consume_from_empty_queue,
+ consume_and_autoack,
+ subscribe,
+ subscribe_with_autoack,
+ consume_and_ack,
+ consume_and_multiple_ack,
+ subscribe_and_ack,
+ subscribe_and_multiple_ack,
+ consume_and_requeue_nack,
+ consume_and_requeue_multiple_nack,
+ subscribe_and_requeue_nack,
+ subscribe_and_requeue_multiple_nack,
+ consume_and_nack,
+ consume_and_multiple_nack,
+ subscribe_and_nack,
+ subscribe_and_multiple_nack,
+ subscribe_should_fail_when_global_qos_true,
+ publisher_confirms,
+ publisher_confirms_with_deleted_queue,
+ dead_letter_to_classic_queue,
+ dead_letter_to_quorum_queue,
+ dead_letter_from_classic_to_quorum_queue,
+ cleanup_queue_state_on_channel_after_publish,
+ cleanup_queue_state_on_channel_after_subscribe,
+ basic_cancel,
+ purge,
+ sync_queue,
+ cancel_sync_queue,
+ basic_recover,
+ idempotent_recover,
+ vhost_with_quorum_queue_is_deleted
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(Group, Config) ->
+ ClusterSize = case Group of
+ single_node -> 1;
+ cluster_size_2 -> 2;
+ cluster_size_3 -> 3;
+ cluster_size_5 -> 5
+ end,
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Group},
+ {tcp_ports_base}]),
+ Config2 = rabbit_ct_helpers:run_steps(Config1,
+ [fun merge_app_env/1 ] ++
+ rabbit_ct_broker_helpers:setup_steps()),
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_queue_cleanup_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end.
+
+end_per_group(clustered, Config) ->
+ Config;
+end_per_group(unclustered, Config) ->
+ Config;
+end_per_group(_, Config) ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
+ Q = rabbit_data_coercion:to_binary(Testcase),
+ Config2 = rabbit_ct_helpers:set_config(Config1,
+ [{queue_name, Q}
+ ]),
+ rabbit_ct_helpers:run_steps(Config2,
+ rabbit_ct_client_helpers:setup_steps()).
+
+merge_app_env(Config) ->
+ rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}).
+
+end_per_testcase(Testcase, Config) ->
+ catch delete_queues(),
+ Config1 = rabbit_ct_helpers:run_steps(
+ Config,
+ rabbit_ct_client_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+declare_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}]),
+ assert_queue_type(Server, LQ, quorum),
+
+ DQ = <<"classic-declare-args-q">>,
+ declare(Ch, DQ, [{<<"x-queue-type">>, longstr, <<"classic">>}]),
+ assert_queue_type(Server, DQ, classic),
+
+ DQ2 = <<"classic-q2">>,
+ declare(Ch, DQ2),
+ assert_queue_type(Server, DQ2, classic).
+
+declare_invalid_properties(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ LQ = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ auto_delete = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ exclusive = true,
+ durable = true,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ amqp_channel:call(
+ rabbit_ct_client_helpers:open_channel(Config, Server),
+ #'queue.declare'{queue = LQ,
+ durable = false,
+ arguments = [{<<"x-queue-type">>, longstr, <<"quorum">>}]})).
+
+declare_invalid_args(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ LQ = ?config(queue_name, Config),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-expires">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-message-ttl">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length">>, long, 2000}])),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-length-bytes">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-max-priority">>, long, 2000}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-overflow">>, longstr, <<"drop-head">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-queue-mode">>, longstr, <<"lazy">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, longstr, <<"hop">>}])),
+
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 406, _}}, _},
+ declare(rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, long, 0}])).
+
+start_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Test declare with same arguments
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Test declare an existing queue with different arguments
+ ?assertExit(_, declare(Ch, LQ, [])),
+
+ %% Check that the application and process are still up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+
+start_queue_concurrent(Config) ->
+ Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ LQ = ?config(queue_name, Config),
+ Self = self(),
+ [begin
+ _ = spawn_link(fun () ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, Server),
+ %% Test declare an existing queue
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ,
+ [{<<"x-queue-type">>,
+ longstr,
+ <<"quorum">>}])),
+ Self ! {done, Server}
+ end)
+ end || Server <- Servers],
+
+ [begin
+ receive {done, Server} -> ok
+ after 5000 -> exit({await_done_timeout, Server})
+ end
+ end || Server <- Servers],
+
+
+ ok.
+
+quorum_cluster_size_3(Config) ->
+ quorum_cluster_size_x(Config, 3, 3).
+
+quorum_cluster_size_7(Config) ->
+ quorum_cluster_size_x(Config, 7, 5).
+
+quorum_cluster_size_x(Config, Max, Expected) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-quorum-initial-group-size">>, long, Max}])),
+ {ok, Members, _} = ra:members({RaName, Server}),
+ ?assertEqual(Expected, length(Members)),
+ Info = rpc:call(Server, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ MembersQ = proplists:get_value(members, Info),
+ ?assertEqual(Expected, length(MembersQ)).
+
+stop_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+
+ %% Delete the quorum queue
+ ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch, #'queue.delete'{queue = LQ})),
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
+restart_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ %% Check that the application and one ra node are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_], rpc:call(Server, supervisor, which_children, [ra_server_sup])).
+
+idempotent_recover(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ LQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', LQ, 0, 0},
+ declare(Ch, LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% kill default vhost to trigger recovery
+ [{_, SupWrapperPid, _, _} | _] = rpc:call(Server, supervisor,
+ which_children,
+ [rabbit_vhost_sup_sup]),
+ [{_, Pid, _, _} | _] = rpc:call(Server, supervisor,
+ which_children,
+ [SupWrapperPid]),
+ %% kill the vhost process to trigger recover
+ rpc:call(Server, erlang, exit, [Pid, kill]),
+
+ timer:sleep(1000),
+ %% validate quorum queue is still functional
+ RaName = ra_name(LQ),
+ {ok, _, _} = ra:members({RaName, Server}),
+ %% validate vhosts are running - or rather validate that at least one
+ %% vhost per cluster is running
+ [begin
+ #{cluster_state := ServerStatuses} = maps:from_list(I),
+ ?assertMatch(#{Server := running}, maps:from_list(ServerStatuses))
+ end || I <- rpc:call(Server, rabbit_vhost,info_all, [])],
+ ok.
+
+vhost_with_quorum_queue_is_deleted(Config) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ VHost = <<"vhost2">>,
+ QName = atom_to_binary(?FUNCTION_NAME, utf8),
+ RaName = binary_to_atom(<<VHost/binary, "_", QName/binary>>, utf8),
+ User = ?config(rmq_username, Config),
+ ok = rabbit_ct_broker_helpers:add_vhost(Config, Node, VHost, User),
+ ok = rabbit_ct_broker_helpers:set_full_permissions(Config, User, VHost),
+ Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config, Node,
+ VHost),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ ?assertEqual({'queue.declare_ok', QName, 0, 0},
+ declare(Ch, QName, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ UId = rpc:call(Node, ra_directory, where_is, [RaName]),
+ ?assert(UId =/= undefined),
+ ok = rabbit_ct_broker_helpers:delete_vhost(Config, VHost),
+ %% validate quorum queues got deleted
+ undefined = rpc:call(Node, ra_directory, where_is, [RaName]),
+ ok.
+
+restart_all_types(Config) ->
+ %% Test the node restart with both types of queues (quorum and classic) to
+ %% ensure there are no regressions
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ1 = <<"restart_all_types-qq1">>,
+ ?assertEqual({'queue.declare_ok', QQ1, 0, 0},
+ declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ QQ2 = <<"restart_all_types-qq2">>,
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ CQ1 = <<"restart_all_types-classic1">>,
+ ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ1, 1),
+ CQ2 = <<"restart_all_types-classic2">>,
+ ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ2, 1),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ %% Check that the application and two ra nodes are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ %% Check the classic queues restarted correctly
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
+ delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
+
+delete_queues(Ch, Queues) ->
+ [amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Queues].
+
+stop_start_rabbit_app(Config) ->
+ %% Test start/stop of rabbit app with both types of queues (quorum and
+ %% classic) to ensure there are no regressions
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ1 = <<"stop_start_rabbit_app-qq">>,
+ ?assertEqual({'queue.declare_ok', QQ1, 0, 0},
+ declare(Ch, QQ1, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ QQ2 = <<"quorum-q2">>,
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ CQ1 = <<"stop_start_rabbit_app-classic">>,
+ ?assertEqual({'queue.declare_ok', CQ1, 0, 0}, declare(Ch, CQ1, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ1, 1),
+ CQ2 = <<"stop_start_rabbit_app-classic2">>,
+ ?assertEqual({'queue.declare_ok', CQ2, 0, 0}, declare(Ch, CQ2, [])),
+ rabbit_ct_client_helpers:publish(Ch, CQ2, 1),
+
+ rabbit_control_helper:command(stop_app, Server),
+ %% Check the ra application has stopped (thus its supervisor and queues)
+ ?assertMatch(false, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+
+ rabbit_control_helper:command(start_app, Server),
+
+ %% Check that the application and two ra nodes are up
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))),
+ ?assertMatch([_,_], rpc:call(Server, supervisor, which_children, [ra_server_sup])),
+ %% Check the classic queues restarted correctly
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ1, no_ack = false}),
+ {#'basic.get_ok'{}, #amqp_msg{}} =
+ amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}),
+ delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]).
+
+publish(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ publish(Ch, QQ),
+ Name = ra_name(QQ),
+ wait_for_messages_ready(Servers, Name, 1),
+ wait_for_messages_pending_ack(Servers, Name, 0).
+
+ra_name(Q) ->
+ binary_to_atom(<<"%2F_", Q/binary>>, utf8).
+
+publish_and_restart(Config) ->
+ %% Test the node restart with both types of queues (quorum and classic) to
+ %% ensure there are no regressions
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ RaName = ra_name(QQ),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ publish(rabbit_ct_client_helpers:open_channel(Config, Server), QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_first_empty(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ consume_empty(Ch, QQ, false),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, false),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+consume_in_minority(Config) ->
+ [Server0, Server1, Server2] =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _},
+ amqp_channel:call(Ch, #'basic.get'{queue = QQ,
+ no_ack = false})).
+
+consume_and_autoack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ consume(Ch, QQ, true),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_from_empty_queue(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ consume_empty(Ch, QQ, false).
+
+subscribe(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_should_fail_when_global_qos_true(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ qos(Ch, 10, true),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ try subscribe(Ch, QQ, false) of
+ _ -> exit(subscribe_should_not_pass)
+ catch
+ _:_ = Err ->
+ ct:pal("Err ~p", [Err])
+ end,
+ ok.
+
+subscribe_with_autoack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, true),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_multiple_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_and_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+subscribe_and_multiple_ack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+consume_and_requeue_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_requeue_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+consume_and_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ _ = consume(Ch, QQ, false),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+subscribe_and_requeue_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = true}),
+ receive_basic_deliver(true),
+ receive_basic_deliver(true),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end
+ end.
+
+subscribe_and_requeue_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true}),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end
+ end.
+
+subscribe_and_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+subscribe_and_multiple_nack(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive_basic_deliver(false),
+ receive_basic_deliver(false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 3),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+publisher_confirms(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ ct:pal("WAIT FOR CONFIRMS ~n", []),
+ amqp_channel:wait_for_confirms(Ch, 5000),
+ amqp_channel:unregister_confirm_handler(Ch),
+ ok.
+
+publisher_confirms_with_deleted_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+ % subscribe(Ch, QQ, false),
+ publish(Ch, QQ),
+ delete_queues(Ch, [QQ]),
+ ct:pal("WAIT FOR CONFIRMS ~n", []),
+ amqp_channel:wait_for_confirms_or_die(Ch, 5000),
+ amqp_channel:unregister_confirm_handler(Ch),
+ ok.
+
+dead_letter_to_classic_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ CQ = <<"classic-dead_letter_to_classic_queue">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, CQ}
+ ])),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, CQ, false).
+
+dead_letter_to_quorum_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ QQ2 = <<"dead_letter_to_quorum_queue-q2">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, QQ2}
+ ])),
+ ?assertEqual({'queue.declare_ok', QQ2, 0, 0},
+ declare(Ch, QQ2, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ RaName2 = ra_name(QQ2),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName2, 0),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ wait_for_messages_ready(Servers, RaName2, 0),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName2, 1),
+ wait_for_messages_pending_ack(Servers, RaName2, 0),
+ _ = consume(Ch, QQ2, false).
+
+dead_letter_from_classic_to_quorum_queue(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = <<"classic-q-dead_letter_from_classic_to_quorum_queue">>,
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0},
+ declare(Ch, CQ, [{<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, QQ}
+ ])),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch, CQ),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]),
+ DeliveryTag = consume(Ch, CQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"1">>, <<"0">>, <<"1">>]]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]),
+ _ = consume(Ch, QQ, false),
+ rabbit_ct_client_helpers:close_channel(Ch).
+
+cleanup_queue_state_on_channel_after_publish(Config) ->
+ %% Declare/delete the queue in one channel and publish on a different one,
+ %% to verify that the cleanup is propagated through channels
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch2, QQ),
+ Res = dirty_query(Servers, RaName, fun rabbit_fifo:query_consumer_count/1),
+ ct:pal ("Res ~p", [Res]),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName, 1),
+ [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
+ %% Check the channel state contains the state for the quorum queue on
+ %% channel 1 and 2
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 1),
+ %% then delete the queue and wait for the process to terminate
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children,
+ [ra_server_sup])
+ end),
+ %% Check that all queue states have been cleaned
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 0).
+
+cleanup_queue_state_on_channel_after_subscribe(Config) ->
+ %% Declare/delete the queue and publish in one channel, while consuming on a
+ %% different one to verify that the cleanup is propagated through channels
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ Ch2 = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch1, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ RaName = ra_name(QQ),
+ publish(Ch1, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch2, QQ, false),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch2, #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = true}),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end,
+ [NCh1, NCh2] = rpc:call(Server, rabbit_channel, list, []),
+ %% Check the channel state contains the state for the quorum queue on channel 1 and 2
+ wait_for_cleanup(Server, NCh1, 1),
+ wait_for_cleanup(Server, NCh2, 1),
+ ?assertMatch(#'queue.delete_ok'{}, amqp_channel:call(Ch1, #'queue.delete'{queue = QQ})),
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ %% Check that all queue states have been cleaned
+ wait_for_cleanup(Server, NCh1, 0),
+ wait_for_cleanup(Server, NCh2, 0).
+
+recover_from_single_failure(Config) ->
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready([Server, Server1], RaName, 3),
+ wait_for_messages_pending_ack([Server, Server1], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+recover_from_multiple_failures(Config) ->
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+
+ %% there is an assumption here that the messages were not lost and were
+ %% recovered when a quorum was restored. Not the best test perhaps.
+ wait_for_messages_ready(Servers, RaName, 6),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+leadership_takeover(Config) ->
+ %% Kill nodes in succession forcing the takeover of leadership, and all messages that
+ %% are in the queue.
+ [Server, Server1, Server2] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ RaName = ra_name(QQ),
+
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server2),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server),
+
+ wait_for_messages_ready([Server2, Server], RaName, 3),
+ wait_for_messages_pending_ack([Server2, Server], RaName, 0),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, Server1),
+ wait_for_messages_ready(Servers, RaName, 3),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+metrics_cleanup_on_leadership_takeover(Config) ->
+ %% Queue core metrics should be deleted from a node once the leadership is transferred
+ %% to another follower
+ [Server, _, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+ {ok, _, {_, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ wait_until(
+ fun() ->
+ case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of
+ [{QRes, 3, 0, 3, _}] -> true;
+ _ -> false
+ end
+ end),
+ force_leader_change(Leader, Servers, QQ),
+ wait_until(fun () ->
+ [] =:= rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) andalso
+ [] =:= rpc:call(Leader, ets, lookup, [queue_metrics, QRes])
+ end),
+ ok.
+
+metrics_cleanup_on_leader_crash(Config) ->
+ %% Queue core metrics should be deleted from a node once the leadership is transferred
+ %% to another follower
+ [Server | _] = Servers =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+
+ wait_for_messages_ready([Server], RaName, 3),
+ wait_for_messages_pending_ack([Server], RaName, 0),
+ {ok, _, {Name, Leader}} = ra:members({RaName, Server}),
+ QRes = rabbit_misc:r(<<"/">>, queue, QQ),
+ wait_until(
+ fun() ->
+ case rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes]) of
+ [{QRes, 3, 0, 3, _}] -> true;
+ _ -> false
+ end
+ end),
+ Pid = rpc:call(Leader, erlang, whereis, [Name]),
+ rpc:call(Leader, erlang, exit, [Pid, kill]),
+ [Other | _] = lists:delete(Leader, Servers),
+ catch ra:trigger_election(Other),
+ %% kill it again just in case it came straight back up again
+ catch rpc:call(Leader, erlang, exit, [Pid, kill]),
+
+ %% this isn't a reliable test as the leader can be restarted so quickly
+ %% after a crash it is elected leader of the next term as well.
+ wait_until(
+ fun() ->
+ [] == rpc:call(Leader, ets, lookup, [queue_coarse_metrics, QRes])
+ end),
+ ok.
+
+delete_declare(Config) ->
+ %% Delete cluster in ra is asynchronous, we have to ensure that we handle that in rmq
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 3),
+
+ ?assertMatch(#'queue.delete_ok'{},
+ amqp_channel:call(Ch, #'queue.delete'{queue = QQ})),
+ %% the actual data deletions happen after the call has returned as a quorum
+ %% queue leader waits for all nodes to confirm they replicated the poison
+ %% pill before terminating itself.
+ timer:sleep(1000),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ %% Ensure that is a new queue and it's empty
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+
+basic_cancel(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+ receive
+ {#'basic.deliver'{}, _} ->
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0)
+ end.
+
+purge(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 2),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _DeliveryTag = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ {'queue.purge_ok', 2} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaName, 0).
+
+sync_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {error, _, _} =
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QQ]),
+ ok.
+
+cancel_sync_queue(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ {error, _, _} =
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"cancel_sync_queue">>, QQ]),
+ ok.
+
+declare_during_node_down(Config) ->
+ [Server, DownServer, _] = Servers = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+
+ stop_node(Config, DownServer),
+ % rabbit_ct_broker_helpers:stop_node(Config, DownServer),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ timer:sleep(2000),
+ rabbit_ct_broker_helpers:start_node(Config, DownServer),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ ok.
+
+add_member_not_running(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ ct:pal("add_member_not_running config ~p", [Config]),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, 'rabbit@burrow'])).
+
+add_member_classic(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, CQ, Server])).
+
+add_member_already_a_member(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, already_a_member},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server])).
+
+add_member_not_found(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({error, not_found},
+ rpc:call(Server, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server])).
+
+add_member(Config) ->
+ [Server0, Server1] = Servers0 =
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server0),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server0, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server1])),
+ ok = rabbit_control_helper:command(stop_app, Server1),
+ ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []),
+ rabbit_control_helper:command(start_app, Server1),
+ ?assertEqual(ok, rpc:call(Server0, rabbit_quorum_queue, add_member,
+ [<<"/">>, QQ, Server1])),
+ Info = rpc:call(Server0, rabbit_quorum_queue, infos,
+ [rabbit_misc:r(<<"/">>, queue, QQ)]),
+ Servers = lists:sort(Servers0),
+ ?assertEqual(Servers, lists:sort(proplists:get_value(online, Info, []))).
+
+delete_member_not_running(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ ?assertEqual({error, node_not_running},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, 'rabbit@burrow'])).
+
+delete_member_classic(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ CQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])),
+ ?assertEqual({error, classic_queue_not_supported},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, CQ, Server])).
+
+delete_member_not_found(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({error, not_found},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])).
+
+delete_member(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+ ?assertEqual(ok,
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])),
+ ?assertEqual({error, not_a_member},
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])).
+
+basic_recover(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ _ = consume(Ch, QQ, false),
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ amqp_channel:cast(Ch, #'basic.recover'{requeue = true}),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0).
+%%----------------------------------------------------------------------------
+
+declare(Ch, Q) ->
+ declare(Ch, Q, []).
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ auto_delete = false,
+ arguments = Args}).
+
+assert_queue_type(Server, Q, Expected) ->
+ Actual = get_queue_type(Server, Q),
+ Expected = Actual.
+
+get_queue_type(Server, Q) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
+ {ok, AMQQueue} =
+ rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ AMQQueue#amqqueue.type.
+
+wait_for_messages(Config, Stats) ->
+ wait_for_messages(Config, lists:sort(Stats), 60).
+
+wait_for_messages(Config, Stats, 0) ->
+ ?assertEqual(Stats,
+ lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))));
+wait_for_messages(Config, Stats, N) ->
+ case lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))) of
+ Stats0 when Stats0 == Stats ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Config, Stats, N - 1)
+ end.
+
+filter_queues(Expected, Got) ->
+ Keys = [K || [K, _, _, _] <- Expected],
+ lists:filter(fun([K, _, _, _]) ->
+ lists:member(K, Keys)
+ end, Got).
+
+publish(Ch, Queue) ->
+ ok = amqp_channel:call(Ch,
+ #'basic.publish'{routing_key = Queue},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = <<"msg">>}).
+
+consume(Ch, Queue, NoAck) ->
+ {GetOk, _} = Reply = amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = NoAck}),
+ ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"msg">>}}, Reply),
+ GetOk#'basic.get_ok'.delivery_tag.
+
+consume_empty(Ch, Queue, NoAck) ->
+ ?assertMatch(#'basic.get_empty'{},
+ amqp_channel:call(Ch, #'basic.get'{queue = Queue,
+ no_ack = NoAck})).
+
+subscribe(Ch, Queue, NoAck) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue,
+ no_ack = NoAck,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+qos(Ch, Prefetch, Global) ->
+ ?assertMatch(#'basic.qos_ok'{},
+ amqp_channel:call(Ch, #'basic.qos'{global = Global,
+ prefetch_count = Prefetch})).
+
+receive_basic_deliver(Redelivered) ->
+ receive
+ {#'basic.deliver'{redelivered = R}, _} when R == Redelivered ->
+ ok
+ end.
+
+wait_for_cleanup(Server, Channel, Number) ->
+ wait_for_cleanup(Server, Channel, Number, 60).
+
+wait_for_cleanup(Server, Channel, Number, 0) ->
+ ?assertEqual(Number, length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])));
+wait_for_cleanup(Server, Channel, Number, N) ->
+ case length(rpc:call(Server, rabbit_channel, list_queue_states, [Channel])) of
+ Length when Number == Length ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_cleanup(Server, Channel, Number, N - 1)
+ end.
+
+
+wait_for_messages_ready(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_ready/1, 60).
+
+wait_for_messages_pending_ack(Servers, QName, Ready) ->
+ wait_for_messages(Servers, QName, Ready,
+ fun rabbit_fifo:query_messages_checked_out/1, 60).
+
+wait_for_messages(Servers, QName, Number, Fun, 0) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ Totals = lists:map(fun(M) when is_map(M) ->
+ maps:size(M);
+ (_) ->
+ -1
+ end, Msgs),
+ ?assertEqual(Totals, [Number || _ <- lists:seq(1, length(Servers))]);
+wait_for_messages(Servers, QName, Number, Fun, N) ->
+ Msgs = dirty_query(Servers, QName, Fun),
+ case lists:all(fun(M) when is_map(M) ->
+ maps:size(M) == Number;
+ (_) ->
+ false
+ end, Msgs) of
+ true ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Servers, QName, Number, Fun, N - 1)
+ end.
+
+dirty_query(Servers, QName, Fun) ->
+ lists:map(
+ fun(N) ->
+ case rpc:call(N, ra, local_query, [{QName, N}, Fun]) of
+ {ok, {_, Msgs}, _} ->
+ Msgs;
+ _ ->
+ undefined
+ end
+ end, Servers).
+
+wait_until(Condition) ->
+ wait_until(Condition, 60).
+
+wait_until(Condition, 0) ->
+ ?assertEqual(true, Condition());
+wait_until(Condition, N) ->
+ case Condition() of
+ true ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_until(Condition, N - 1)
+ end.
+
+force_leader_change(Leader, Servers, Q) ->
+ RaName = ra_name(Q),
+ [F1, _] = Servers -- [Leader],
+ ok = rpc:call(F1, ra, trigger_election, [{RaName, F1}]),
+ case ra:members({RaName, Leader}) of
+ {ok, _, {_, Leader}} ->
+ %% Leader has been re-elected
+ force_leader_change(Leader, Servers, Q);
+ {ok, _, _} ->
+ %% Leader has changed
+ ok
+ end.
+
+delete_queues() ->
+ [rabbit_amqqueue:delete(Q, false, false, <<"dummy">>)
+ || Q <- rabbit_amqqueue:list()].
+
+stop_node(Config, Server) ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, Server, ["stop"]).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
new file mode 100644
index 0000000000..a2e22afc2e
--- /dev/null
+++ b/test/rabbit_fifo_SUITE.erl
@@ -0,0 +1,624 @@
+-module(rabbit_fifo_SUITE).
+
+-compile(export_all).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+all() ->
+ [
+ {group, tests}
+ ].
+
+all_tests() ->
+ [
+ basics,
+ return,
+ rabbit_fifo_returns_correlation,
+ resends_lost_command,
+ returns_after_down,
+ resends_after_lost_applied,
+ handles_reject_notification,
+ two_quick_enqueues,
+ detects_lost_delivery,
+ dequeue,
+ discard,
+ cancel_checkout,
+ credit,
+ untracked_enqueue,
+ flow,
+ test_queries,
+ duplicate_delivery,
+ usage
+ ].
+
+groups() ->
+ [
+ {tests, [], all_tests()}
+ ].
+
+init_per_group(_, Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ _ = application:load(ra),
+ ok = application:set_env(ra, data_dir, PrivDir),
+ application:ensure_all_started(ra),
+ application:ensure_all_started(lg),
+ Config.
+
+end_per_group(_, Config) ->
+ _ = application:stop(ra),
+ Config.
+
+init_per_testcase(TestCase, Config) ->
+ ra_server_sup:remove_all(),
+ ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
+ ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
+ [
+ {cluster_name, TestCase},
+ {uid, atom_to_binary(TestCase, utf8)},
+ {node_id, {TestCase, node()}},
+ {uid2, atom_to_binary(ServerName2, utf8)},
+ {node_id2, {ServerName2, node()}},
+ {uid3, atom_to_binary(ServerName3, utf8)},
+ {node_id3, {ServerName3, node()}}
+ | Config].
+
+basics(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ CustomerTag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0),
+
+ ra_log_wal:force_roll_over(ra_log_wal),
+ % create segment the segment will trigger a snapshot
+ timer:sleep(1000),
+
+ {ok, FState2} = rabbit_fifo_client:enqueue(one, FState1),
+ % process ra events
+ FState3 = process_ra_event(FState2, 250),
+
+ FState5 = receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, FState3) of
+ {internal, _AcceptedSeqs, _Actions, _FState4} ->
+ exit(unexpected_internal_event);
+ {{delivery, C, [{MsgId, _Msg}]}, FState4} ->
+ {ok, S} = rabbit_fifo_client:settle(C, [MsgId],
+ FState4),
+ S
+ end
+ after 5000 ->
+ exit(await_msg_timeout)
+ end,
+
+ % process settle applied notificaiton
+ FState5b = process_ra_event(FState5, 250),
+ _ = ra:stop_server(ServerId),
+ _ = ra:restart_server(ServerId),
+
+ % give time to become leader
+ timer:sleep(500),
+ {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b),
+ % process applied event
+ FState6b = process_ra_event(FState6, 250),
+
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, FState6b) of
+ {internal, _, _, _FState7} ->
+ ct:pal("unexpected event ~p~n", [E]),
+ exit({unexpected_internal_event, E});
+ {{delivery, Ctag, [{Mid, {_, two}}]}, FState7} ->
+ {ok, _S} = rabbit_fifo_client:return(Ctag, [Mid], FState7),
+ ok
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+return(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ ok = start_cluster(ClusterName, [ServerId, ServerId2]),
+
+ F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]),
+ {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
+ {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
+ {_, _, F2} = process_ra_events(F1, 100),
+ {ok, {MsgId, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
+ {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
+
+ ra:stop_server(ServerId),
+ ok.
+
+rabbit_fifo_returns_correlation(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(corr1, msg1, F0),
+ receive
+ {ra_event, Frm, E} ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, F1) of
+ {internal, [corr1], [], _F2} ->
+ ok;
+ {Del, _} ->
+ exit({unexpected, Del})
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+duplicate_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ Fun = fun Loop(S0) ->
+ receive
+ {ra_event, Frm, E} = Evt ->
+ case rabbit_fifo_client:handle_ra_event(Frm, E, S0) of
+ {internal, [corr1], [], S1} ->
+ Loop(S1);
+ {_Del, S1} ->
+ %% repeat event delivery
+ self() ! Evt,
+ %% check that then next received delivery doesn't
+ %% repeat or crash
+ receive
+ {ra_event, F, E1} ->
+ case rabbit_fifo_client:handle_ra_event(F, E1, S1) of
+ {internal, [], [], S2} ->
+ S2
+ end
+ end
+ end
+ after 2000 ->
+ exit(await_msg_timeout)
+ end
+ end,
+ Fun(F2),
+ ra:stop_server(ServerId),
+ ok.
+
+usage(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
+ {_, _, _} = process_ra_events(F3, 50),
+ % force tick and usage stats emission
+ ServerId ! tick_timeout,
+ timer:sleep(50),
+ % ct:pal("ets ~w ~w ~w", [ets:tab2list(rabbit_fifo_usage), ServerId, UId]),
+ Use = rabbit_fifo:usage(element(1, ServerId)),
+ ct:pal("Use ~w~n", [Use]),
+ ra:stop_server(ServerId),
+ ?assert(Use > 0.0),
+ ok.
+
+resends_lost_command(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = meck:new(ra, [passthrough]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ % lose the enqueue
+ meck:expect(ra, pipeline_command, fun (_, _, _) -> ok end),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ meck:unload(ra),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+two_quick_enqueues(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ _ = process_ra_events(F2, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+detects_lost_delivery(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
+ {_, _, F0} = process_ra_events(F00, 100),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ % lose first delivery
+ receive
+ {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
+ ok
+ after 500 ->
+ exit(await_delivery_timeout)
+ end,
+
+ % assert three deliveries were received
+ {[_, _, _], _, _} = process_ra_events(F3, 500),
+ ra:stop_server(ServerId),
+ ok.
+
+returns_after_down(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ {_, _, F2} = process_ra_events(F1, 500),
+ % start a customer in a separate processes
+ % that exits after checkout
+ Self = self(),
+ _Pid = spawn(fun () ->
+ F = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F),
+ Self ! checkout_done
+ end),
+ receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
+ % message should be available for dequeue
+ {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F2),
+ ra:stop_server(ServerId),
+ ok.
+
+resends_after_lost_applied(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)),
+ 500),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ % lose an applied event
+ receive
+ {ra_event, _, {applied, _}} ->
+ ok
+ after 500 ->
+ exit(await_ra_event_timeout)
+ end,
+ % send another message
+ {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
+ {_, _, F4} = process_ra_events(F3, 500),
+ {ok, {_, {_, msg1}}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
+ {ok, {_, {_, msg2}}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {_, {_, msg3}}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ ra:stop_server(ServerId),
+ ok.
+
+handles_reject_notification(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId1 = ?config(node_id, Config),
+ ServerId2 = ?config(node_id2, Config),
+ UId1 = ?config(uid, Config),
+ CId = {UId1, self()},
+
+ ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
+ _ = ra:process_command(ServerId1, {checkout,
+ {auto, 10, simple_prefetch}, CId}),
+ % reverse order - should try the first node in the list first
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
+ {ok, F1} = rabbit_fifo_client:enqueue(one, F0),
+
+ timer:sleep(500),
+
+ % the applied notification
+ _F2 = process_ra_event(F1, 250),
+ ra:stop_server(ServerId1),
+ ra:stop_server(ServerId2),
+ ok.
+
+discard(Config) ->
+ PrivDir = ?config(priv_dir, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ ClusterName = ?config(cluster_name, Config),
+ Conf = #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{data_dir => PrivDir, uid => UId},
+ initial_member => [],
+ machine => {module, rabbit_fifo,
+ #{dead_letter_handler =>
+ {?MODULE, dead_letter_handler, [self()]}}}},
+ _ = ra:start_server(Conf),
+ ok = ra:trigger_election(ServerId),
+ _ = ra:members(ServerId),
+
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
+ F3 = discard_next_delivery(F2, 500),
+ {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
+ receive
+ {dead_letter, Letters} ->
+ ct:pal("dead letters ~p~n", [Letters]),
+ [{_, msg1}] = Letters,
+ ok
+ after 500 ->
+ exit(dead_letter_timeout)
+ end,
+ ra:stop_server(ServerId),
+ ok.
+
+cancel_checkout(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1),
+ {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
+ {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
+ {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
+ ok.
+
+credit(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {_, _, F3} = process_ra_events(F2, [], 250),
+ %% checkout with 0 prefetch
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3),
+ %% assert no deliveries
+ {_, _, F5} = process_ra_events0(F4, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% provide some credit
+ {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
+ {[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
+ process_ra_events(F6, [], 250),
+
+ %% credit and drain
+ {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
+ {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
+ process_ra_events(F8, [], 250),
+ flush(),
+
+ %% enqueue another message - at this point the consumer credit should be
+ %% all used up due to the drain
+ {ok, F10} = rabbit_fifo_client:enqueue(m3, F9),
+ %% assert no deliveries
+ {_, _, F11} = process_ra_events0(F10, [], [], 250,
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
+ %% credit again and receive the last message
+ {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
+ {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250),
+ ok.
+
+untracked_enqueue(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+
+ ok = rabbit_fifo_client:untracked_enqueue([ServerId], msg1),
+ timer:sleep(100),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, {_, {_, msg1}}, _} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F0),
+ ra:stop_server(ServerId),
+ ok.
+
+
+flow(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 3),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(m3, F2),
+ {slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
+ {_, _, F5} = process_ra_events(F4, 500),
+ {ok, _} = rabbit_fifo_client:enqueue(m5, F5),
+ ra:stop_server(ServerId),
+ ok.
+
+test_queries(Config) ->
+ ClusterName = ?config(cluster_name, Config),
+ ServerId = ?config(node_id, Config),
+ ok = start_cluster(ClusterName, [ServerId]),
+ P = spawn(fun () ->
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
+ {ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
+ process_ra_events(F2, 100),
+ receive stop -> ok end
+ end),
+ F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0),
+ {ok, {_, Ready}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_ready/1),
+ ?assertEqual(1, maps:size(Ready)),
+ ct:pal("Ready ~w~n", [Ready]),
+ {ok, {_, Checked}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_messages_checked_out/1),
+ ?assertEqual(1, maps:size(Checked)),
+ ct:pal("Checked ~w~n", [Checked]),
+ {ok, {_, Processes}, _} = ra:local_query(ServerId,
+ fun rabbit_fifo:query_processes/1),
+ ct:pal("Processes ~w~n", [Processes]),
+ ?assertEqual(2, length(Processes)),
+ P ! stop,
+ ra:stop_server(ServerId),
+ ok.
+
+dead_letter_handler(Pid, Msgs) ->
+ Pid ! {dead_letter, Msgs}.
+
+dequeue(Config) ->
+ ClusterName = ?config(priv_dir, Config),
+ ServerId = ?config(node_id, Config),
+ UId = ?config(uid, Config),
+ Tag = UId,
+ ok = start_cluster(ClusterName, [ServerId]),
+ F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
+ {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
+ {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
+ {_, _, F2} = process_ra_events(F2_, 100),
+
+ {ok, {0, {_, msg1}}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
+ {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
+ {_, _, F4} = process_ra_events(F4_, 100),
+ {ok, {MsgId, {_, msg2}}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
+ {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
+ ra:stop_server(ServerId),
+ ok.
+
+enq_deq_n(N, F0) ->
+ enq_deq_n(N, F0, []).
+
+enq_deq_n(0, F0, Acc) ->
+ {_, _, F} = process_ra_events(F0, 100),
+ {F, Acc};
+enq_deq_n(N, F, Acc) ->
+ {ok, F1} = rabbit_fifo_client:enqueue(N, F),
+ {_, _, F2} = process_ra_events(F1, 10),
+ {ok, {_, {_, Deq}}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
+
+ {_, _, F4} = process_ra_events(F3, 5),
+ enq_deq_n(N-1, F4, [Deq | Acc]).
+
+conf(ClusterName, UId, ServerId, _, Peers) ->
+ #{cluster_name => ClusterName,
+ id => ServerId,
+ uid => UId,
+ log_init_args => #{uid => UId},
+ initial_members => Peers,
+ machine => {module, rabbit_fifo, #{}}}.
+
+process_ra_event(State, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ % ct:pal("processed ra event ~p~n", [Evt]),
+ {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State),
+ S
+ after Wait ->
+ exit(ra_event_timeout)
+ end.
+
+process_ra_events(State0, Wait) ->
+ process_ra_events(State0, [], Wait).
+
+process_ra_events(State, Acc, Wait) ->
+ DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
+ S2
+ end,
+ process_ra_events0(State, Acc, [], Wait, DeliveryFun).
+
+process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
+ receive
+ {ra_event, From, Evt} ->
+ % ct:pal("ra event ~w~n", [Evt]),
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, Actions, State} ->
+ process_ra_events0(State, Acc, Actions0 ++ Actions,
+ Wait, DeliveryFun);
+ {{delivery, _Tag, Msgs} = Del, State1} ->
+ State = DeliveryFun(Del, State1),
+ process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun);
+ eol ->
+ eol
+ end
+ after Wait ->
+ {Acc, Actions0, State0}
+ end.
+
+discard_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _Actions, State} ->
+ discard_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ ct:pal("discarding ~p", [Msgs]),
+ {ok, State} = rabbit_fifo_client:discard(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+return_next_delivery(State0, Wait) ->
+ receive
+ {ra_event, From, Evt} ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, _, State} ->
+ return_next_delivery(State, Wait);
+ {{delivery, Tag, Msgs}, State1} ->
+ MsgIds = [element(1, M) || M <- Msgs],
+ ct:pal("returning ~p", [Msgs]),
+ {ok, State} = rabbit_fifo_client:return(Tag, MsgIds,
+ State1),
+ State
+ end
+ after Wait ->
+ State0
+ end.
+
+validate_process_down(Name, 0) ->
+ exit({process_not_down, Name});
+validate_process_down(Name, Num) ->
+ case whereis(Name) of
+ undefined ->
+ ok;
+ _ ->
+ timer:sleep(100),
+ validate_process_down(Name, Num-1)
+ end.
+
+start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
+ {ok, Started, _} = ra:start_cluster(ClusterName,
+ {module, rabbit_fifo, RaFifoConfig},
+ ServerIds),
+ ?assertEqual(length(Started), length(ServerIds)),
+ ok.
+
+start_cluster(ClusterName, ServerIds) ->
+ start_cluster(ClusterName, ServerIds, #{}).
+
+flush() ->
+ receive
+ Msg ->
+ ct:pal("flushed: ~w~n", [Msg]),
+ flush()
+ after 10 ->
+ ok
+ end.