summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl302
-rw-r--r--test/backing_queue_SUITE.erl31
-rw-r--r--test/channel_operation_timeout_SUITE.erl14
-rw-r--r--test/cluster_SUITE.erl15
-rw-r--r--test/clustering_management_SUITE.erl66
-rw-r--r--test/crashing_queues_SUITE.erl7
-rw-r--r--test/dynamic_ha_SUITE.erl4
-rw-r--r--test/dynamic_qq_SUITE.erl23
-rw-r--r--test/feature_flags_SUITE.erl372
-rw-r--r--test/mirrored_supervisor_SUITE.erl13
-rw-r--r--test/priority_queue_SUITE.erl8
-rw-r--r--test/quorum_queue_SUITE.erl106
-rw-r--r--test/quorum_queue_utils.erl29
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl8
-rw-r--r--test/single_active_consumer_SUITE.erl22
-rw-r--r--test/unit_inbroker_dead_letter_SUITE.erl1149
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl29
-rw-r--r--test/vhost_SUITE.erl2
19 files changed, 2068 insertions, 134 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
new file mode 100644
index 0000000000..05a049c9bb
--- /dev/null
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -0,0 +1,302 @@
+-module(amqqueue_backward_compatibility_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("amqqueue.hrl").
+
+-export([all/0,
+ groups/0,
+ init_per_suite/2,
+ end_per_suite/2,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ new_amqqueue_v1_is_amqqueue/1,
+ new_amqqueue_v2_is_amqqueue/1,
+ random_term_is_not_amqqueue/1,
+
+ amqqueue_v1_is_durable/1,
+ amqqueue_v2_is_durable/1,
+ random_term_is_not_durable/1,
+
+ amqqueue_v1_state_matching/1,
+ amqqueue_v2_state_matching/1,
+ random_term_state_matching/1,
+
+ amqqueue_v1_type_matching/1,
+ amqqueue_v2_type_matching/1,
+ random_term_type_matching/1,
+
+ upgrade_v1_to_v2/1
+ ]).
+
+-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m,
+ n, o, p, q, r, s, t, u, v, w, x, y, z}).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue,
+ new_amqqueue_v2_is_amqqueue,
+ random_term_is_not_amqqueue,
+ amqqueue_v1_is_durable,
+ amqqueue_v2_is_durable,
+ random_term_is_not_durable,
+ amqqueue_v1_state_matching,
+ amqqueue_v2_state_matching,
+ random_term_state_matching,
+ amqqueue_v1_type_matching,
+ amqqueue_v2_type_matching,
+ random_term_type_matching]}
+ ].
+
+init_per_suite(_, Config) -> Config.
+end_per_suite(_, Config) -> Config.
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+new_amqqueue_v1_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v1(Queue)),
+ ?assert(not ?is_amqqueue_v2(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+new_amqqueue_v2_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2),
+ Queue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v2(Queue)),
+ ?assert(not ?is_amqqueue_v1(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+random_term_is_not_amqqueue(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?is_amqqueue(Term)),
+ ?assert(not ?is_amqqueue_v2(Term)),
+ ?assert(not ?is_amqqueue_v1(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+amqqueue_v2_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+random_term_is_not_durable(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_durable(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+amqqueue_v2_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+random_term_state_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_state_is(Term, live)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)).
+
+amqqueue_v2_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ ClassicQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_is_classic(ClassicQueue)),
+ ?assert(amqqueue:is_classic(ClassicQueue)),
+ ?assert(not ?amqqueue_is_quorum(ClassicQueue)),
+ ?assert(not amqqueue:is_quorum(ClassicQueue)),
+ QuorumQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ quorum),
+ ?assert(not ?amqqueue_is_classic(QuorumQueue)),
+ ?assert(not amqqueue:is_classic(QuorumQueue)),
+ ?assert(?amqqueue_is_quorum(QuorumQueue)),
+ ?assert(amqqueue:is_quorum(QuorumQueue)).
+
+random_term_type_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_classic(Term)),
+ ?assert(not ?amqqueue_is_quorum(Term)),
+ ?assertException(error, function_clause, amqqueue:is_classic(Term)),
+ ?assertException(error, function_clause, amqqueue:is_quorum(Term)).
+
+%% -------------------------------------------------------------------
+
+upgrade_v1_to_v2(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ OldQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue_v1(OldQueue)),
+ ?assert(not ?is_amqqueue_v2(OldQueue)),
+ NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue),
+ ?assert(not ?is_amqqueue_v1(NewQueue)),
+ ?assert(?is_amqqueue_v2(NewQueue)).
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 433bc66bff..c3f87cce59 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile(export_all).
@@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) ->
bq_variable_queue_delete_msg_store_files_callback1(Config) ->
ok = restart_msg_store_empty(),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(
- queue_name(Config,
- <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
publish_and_confirm(Q, Payload, Count),
@@ -718,9 +718,10 @@ bq_queue_recover(Config) ->
bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
@@ -736,7 +737,8 @@ bq_queue_recover1(Config) ->
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
- fun (Q1 = #amqqueue { pid = QPid1 }) ->
+ fun (Q1) when ?is_amqqueue(Q1) ->
+ QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
@@ -752,7 +754,9 @@ bq_queue_recover1(Config) ->
passed.
%% Return the PID of the given queue's supervisor.
-get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) ->
+get_queue_sup_pid(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
VHost = QName#resource.virtual_host,
{ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
Sups = supervisor:which_children(AmqSup),
@@ -1413,8 +1417,8 @@ with_fresh_variable_queue(Fun, Mode) ->
shutdown, Fun(VQ1, QName)),
Me ! Ref
catch
- Type:Error ->
- Me ! {Ref, Type, Error, erlang:get_stacktrace()}
+ Type:Error:Stacktrace ->
+ Me ! {Ref, Type, Error, Stacktrace}
end
end),
receive
@@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
end, {VQ, []}, lists:seq(1, Count)).
test_amqqueue(QName, Durable) ->
- (rabbit_amqqueue:pseudo_queue(QName, self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(QName, self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index 9bfa0ae07a..77da6133d8 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile([export_all]).
@@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node),
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, consumers_all, [VHost]).
-get_amqqueue(Q, []) -> throw({not_found, Q});
-get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ;
-get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem).
+get_amqqueue(QName0, []) ->
+ throw({not_found, QName0});
+get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
+ QName1 = amqqueue:get_name(Q),
+ compare_amqqueue(QName0, QName1, Q, Rem).
+
+compare_amqqueue(QName, QName, Q, _Rem) ->
+ Q;
+compare_amqqueue(QName, _, _, Rem) ->
+ get_amqqueue(QName, Rem).
qconfig(Ch, Name, Ex, Consume, Deliver) ->
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index c52dc9ef64..dc30825f8c 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("include/amqqueue.hrl").
-compile(export_all).
@@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
- {new, #amqqueue{name = QueueName, pid = QPid}} =
- rabbit_amqqueue:declare(QueueName, false, false, [],
- none, <<"acting-user">>),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QueueName = ?amqqueue_field_name(Q),
+ QPid = ?amqqueue_field_pid(Q),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
@@ -269,12 +270,12 @@ must_exit(Fun) ->
end.
dead_queue_loop(QueueName, OldPid) ->
- {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
- <<"acting-user">>),
- case Q#amqqueue.pid of
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QPid = ?amqqueue_field_pid(Q),
+ case QPid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);
- _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ _ -> true = rabbit_misc:is_process_alive(QPid),
Q
end.
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 120257feb9..5ae2fb687c 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -315,14 +315,14 @@ forget_offline_removes_things(Config) ->
forget_promotes_offline_slave(Config) ->
[A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
- Q = <<"mirrored-queue">>,
- declare(ACh, Q),
- set_ha_policy(Config, Q, A, [B, C]),
- set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves
+ QName = <<"mirrored-queue">>,
+ declare(ACh, QName),
+ set_ha_policy(Config, QName, A, [B, C]),
+ set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves
%% Publish and confirm
amqp_channel:call(ACh, #'confirm.select'{}),
- amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q},
+ amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
amqp_channel:wait_for_confirms(ACh),
@@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, D),
DCh2 = rabbit_ct_client_helpers:open_channel(Config, D),
- #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q),
+ #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName),
ok.
-set_ha_policy(Config, Q, Master, Slaves) ->
+set_ha_policy(Config, QName, Master, Slaves) ->
Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]],
- rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q,
- {<<"nodes">>, Nodes}),
- await_slaves(Q, Master, Slaves).
-
-await_slaves(Q, Master, Slaves) ->
- {ok, #amqqueue{pid = MPid,
- slave_pids = SPids}} =
- rpc:call(Master, rabbit_amqqueue, lookup,
- [rabbit_misc:r(<<"/">>, queue, Q)]),
- ActMaster = node(MPid),
+ HaPolicy = {<<"nodes">>, Nodes},
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy),
+ await_slaves(QName, Master, Slaves).
+
+await_slaves(QName, Master, Slaves) ->
+ await_slaves_0(QName, Master, Slaves, 10).
+
+await_slaves_0(QName, Master, Slaves0, Tries) ->
+ {ok, Queue} = await_slaves_lookup_queue(QName, Master),
+ SPids = amqqueue:get_slave_pids(Queue),
+ ActMaster = amqqueue:qnode(Queue),
ActSlaves = lists:usort([node(P) || P <- SPids]),
- case {Master, lists:usort(Slaves)} of
- {ActMaster, ActSlaves} -> ok;
- _ -> timer:sleep(100),
- await_slaves(Q, Master, Slaves)
+ Slaves1 = lists:usort(Slaves0),
+ await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries).
+
+await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) ->
+ error({timeout_waiting_for_slaves, QName});
+await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) ->
+ case {Master, Slaves} of
+ {ActMaster, ActSlaves} ->
+ ok;
+ _ ->
+ timer:sleep(250),
+ await_slaves_0(QName, Master, Slaves, Tries - 1)
+ end.
+
+await_slaves_lookup_queue(QName, Master) ->
+ await_slaves_lookup_queue(QName, Master, 10).
+
+await_slaves_lookup_queue(QName, _Master, 0) ->
+ error({timeout_looking_up_queue, QName});
+await_slaves_lookup_queue(QName, Master, Tries) ->
+ RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)],
+ case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of
+ {error, not_found} ->
+ timer:sleep(250),
+ await_slaves_lookup_queue(QName, Master, Tries - 1);
+ {ok, Q} ->
+ {ok, Q}
end.
force_boot(Config) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 2d91083abc..7b8ec91346 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -217,9 +217,10 @@ kill_queue(Node, QName) ->
await_new_pid(Node, QName, Pid1).
queue_pid(Node, QName) ->
- #amqqueue{pid = QPid,
- state = State,
- name = #resource{virtual_host = VHost}} = lookup(Node, QName),
+ Q = lookup(Node, QName),
+ QPid = amqqueue:get_pid(Q),
+ State = amqqueue:get_state(Q),
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index e41c07a888..6ccf3a75c3 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -614,8 +614,8 @@ get_stacktrace() ->
try
throw(e)
catch
- _:e ->
- erlang:get_stacktrace()
+ _:e:Stacktrace ->
+ Stacktrace
end.
%%----------------------------------------------------------------------------
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index fbc1e81827..89344af30c 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -83,9 +83,26 @@ init_per_testcase(Testcase, Config) ->
{queue_name, Q},
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}
]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
+ Config2 = rabbit_ct_helpers:run_steps(
+ Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config2;
+ false ->
+ end_per_testcase(Testcase, Config2),
+ {skip, "Quorum queues are unsupported"}
+ end.
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
diff --git a/test/feature_flags_SUITE.erl b/test/feature_flags_SUITE.erl
new file mode 100644
index 0000000000..db87442105
--- /dev/null
+++ b/test/feature_flags_SUITE.erl
@@ -0,0 +1,372 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(feature_flags_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([suite/0,
+ all/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ enable_quorum_queue_in_a_healthy_situation/1,
+ enable_unsupported_feature_flag_in_a_healthy_situation/1,
+ enable_quorum_queue_when_ff_file_is_unwritable/1,
+ enable_quorum_queue_with_a_network_partition/1,
+ mark_quorum_queue_as_enabled_with_a_network_partition/1
+ ]).
+
+suite() ->
+ [{timetrap, 5 * 60000}].
+
+all() ->
+ [
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {unclustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable
+ ]},
+ {clustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable,
+ enable_quorum_queue_with_a_network_partition,
+ mark_quorum_queue_as_enabled_with_a_network_partition
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]);
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]);
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, false},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
+ {net_ticktime, 5}
+ ]),
+ Config2 = rabbit_ct_helpers:merge_app_env(
+ Config1,
+ {rabbit,
+ [{forced_feature_flags_on_init, []},
+ {log, [{file, [{level, debug}]}]}]}),
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, is_supported, [quorum_queue]),
+ case Ret of
+ true ->
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end.
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+enable_quorum_queue_in_a_healthy_situation(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Re-enabling the feature flag also works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_unsupported_feature_flag_in_a_healthy_situation(Config) ->
+ FeatureName = unsupported_feature_flag,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is unsupported and thus disabled.
+ ?assertEqual(
+ False,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_quorum_queue_when_ff_file_is_unwritable(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+ Files = feature_flags_files(Config),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Restrict permissions on the `feature_flags` files.
+ [?assertEqual(ok, file:change_mode(File, 8#0444)) || File <- Files],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% The `feature_flags` file were not updated.
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]),
+
+ %% Stop all nodes and restore permissions on the `feature_flags` files.
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- Nodes],
+ [?assertEqual(ok, file:change_mode(File, 8#0644)) || File <- Files],
+
+ %% Restart all nodes and assert the feature flag is still enabled and
+ %% the `feature_flags` files were correctly repaired.
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- lists:reverse(Nodes)],
+
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[FeatureName]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]).
+
+enable_quorum_queue_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate nodes B and E from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {E, A},
+ {E, C},
+ {E, D}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Enabling the feature flag should fail in the specific case of
+ %% `quorum_queue`, if the network is broken.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Repair the network and try again to enable the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- [A, C, D]],
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- [A, C, D]],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+mark_quorum_queue_as_enabled_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate node B from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {B, E}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to timeout.
+ RemoteNodes = [A, C, D, E],
+ ?assertEqual(
+ {failed_to_mark_feature_flag_as_enabled_on_remote_nodes,
+ FeatureName,
+ true,
+ RemoteNodes},
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 20000])),
+
+ RepairFun = fun() ->
+ %% Wait a few seconds before we repair the network.
+ timer:sleep(5000),
+
+ %% Repair the network and try again to enable
+ %% the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000)
+ end,
+ spawn(RepairFun),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to work this time.
+ ct:pal(?LOW_IMPORTANCE,
+ "Marking the feature flag as enabled on remote nodes...", []),
+ ?assertEqual(
+ ok,
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 120000])).
+
+%% FIXME: Finish the testcase above ^
+
+%% -------------------------------------------------------------------
+%% Internal helpers.
+%% -------------------------------------------------------------------
+
+enable_feature_flag_on(Config, Node, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc(
+ Config, Node, rabbit_feature_flags, enable, [FeatureName]).
+
+is_feature_flag_supported(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_supported, [FeatureName]).
+
+is_feature_flag_enabled(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_enabled, [FeatureName]).
+
+feature_flags_files(Config) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, enabled_feature_flags_list_file, []).
+
+block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs].
+unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs].
+
+block(X, Y) ->
+ rabbit_ct_broker_helpers:block_traffic_between(X, Y).
+
+allow(X, Y) ->
+ rabbit_ct_broker_helpers:allow_traffic_between(X, Y).
diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl
index d3cc080eeb..aa114e0a84 100644
--- a/test/mirrored_supervisor_SUITE.erl
+++ b/test/mirrored_supervisor_SUITE.erl
@@ -294,14 +294,17 @@ get_group(Group) ->
call(Id, Msg) -> call(Id, Msg, 10*1000, 100).
-call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
-
call(Id, Msg, MaxDelay, Decr) ->
+ call(Id, Msg, MaxDelay, Decr, undefined).
+
+call(Id, Msg, 0, _Decr, Stacktrace) ->
+ exit({timeout_waiting_for_server, {Id, Msg}, Stacktrace});
+
+call(Id, Msg, MaxDelay, Decr, _) ->
try
gen_server:call(Id, Msg, infinity)
- catch exit:_ -> timer:sleep(Decr),
- call(Id, Msg, MaxDelay - Decr, Decr)
+ catch exit:_:Stacktrace -> timer:sleep(Decr),
+ call(Id, Msg, MaxDelay - Decr, Decr, Stacktrace)
end.
kill(Pid) -> kill(Pid, []).
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 9db866e3c6..8a0ab98241 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_head_message_timestamp-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(head_message_timestamp, BQS1),
@@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) ->
ram_duration(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
{_Duration1, BQS2} = PQ:ram_duration(BQS1),
BQS3 = PQ:set_ram_duration_target(infinity, BQS2),
BQS4 = PQ:set_ram_duration_target(1, BQS3),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 48dac3ca57..fd58756fda 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -23,11 +23,15 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
+ wait_for_messages/2,
dirty_query/3,
ra_name/1]).
-compile(export_all).
+suite() ->
+ [{timetrap, 5 * 60000}].
+
all() ->
[
{group, single_node},
@@ -172,17 +176,32 @@ init_per_group(Group, Config) ->
Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
- %% HACK: the larger cluster sizes benefit for a bit more time
- %% after clustering before running the tests.
- case Group of
- cluster_size_5 ->
- timer:sleep(5000),
- Config2;
- _ ->
- Config2
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_queue_cleanup_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ false ->
+ end_per_group(Group, Config2),
+ {skip, "Quorum queues are unsupported"}
end.
end_per_group(clustered, Config) ->
@@ -206,11 +225,28 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
{tcp_ports_base},
{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps() ++
- [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
- fun rabbit_ct_broker_helpers:cluster_nodes/1]);
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config3, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -2193,40 +2229,10 @@ assert_queue_type(Server, Q, Expected) ->
Actual = get_queue_type(Server, Q),
Expected = Actual.
-get_queue_type(Server, Q) ->
- QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
- {ok, AMQQueue} =
- rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
- AMQQueue#amqqueue.type.
-
-wait_for_messages(Config, Stats) ->
- wait_for_messages(Config, lists:sort(Stats), 60).
-
-wait_for_messages(Config, Stats, 0) ->
- ?assertEqual(Stats,
- lists:sort(
- filter_queues(Stats,
- rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "messages", "messages_ready",
- "messages_unacknowledged"]))));
-wait_for_messages(Config, Stats, N) ->
- case lists:sort(
- filter_queues(Stats,
- rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "messages", "messages_ready",
- "messages_unacknowledged"]))) of
- Stats0 when Stats0 == Stats ->
- ok;
- _ ->
- timer:sleep(500),
- wait_for_messages(Config, Stats, N - 1)
- end.
-
-filter_queues(Expected, Got) ->
- Keys = [K || [K, _, _, _] <- Expected],
- lists:filter(fun([K, _, _, _]) ->
- lists:member(K, Keys)
- end, Got).
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 6b820c7b5c..9c988bc066 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -6,6 +6,7 @@
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
+ wait_for_messages/2,
dirty_query/3,
ra_name/1
]).
@@ -45,6 +46,29 @@ wait_for_messages(Servers, QName, Number, Fun, N) ->
wait_for_messages(Servers, QName, Number, Fun, N - 1)
end.
+wait_for_messages(Config, Stats) ->
+ wait_for_messages(Config, lists:sort(Stats), 10).
+
+wait_for_messages(Config, Stats, 0) ->
+ ?assertEqual(Stats,
+ lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))));
+wait_for_messages(Config, Stats, N) ->
+ case lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))) of
+ Stats0 when Stats0 == Stats ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Config, Stats, N - 1)
+ end.
+
dirty_query(Servers, QName, Fun) ->
lists:map(
fun(N) ->
@@ -59,3 +83,8 @@ dirty_query(Servers, QName, Fun) ->
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
+filter_queues(Expected, Got) ->
+ Keys = [K || [K, _, _, _] <- Expected],
+ lists:filter(fun([K, _, _, _]) ->
+ lists:member(K, Keys)
+ end, Got).
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 7ae43aa7e1..42ac863ead 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -364,11 +364,9 @@ cluster_queue_metrics(Config) ->
% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
- [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
- ets, lookup,
- [rabbit_queue, Name]),
- ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
- sync_mirrors, [QPid]),
+ [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]),
+ QPid = amqqueue:get_pid(Q),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
% Check ETS table for data
wait_for(fun () ->
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 25261042b2..6071aeb5a5 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -66,13 +66,21 @@ init_per_group(classic_queue, Config) ->
auto_delete = true}
} | Config];
init_per_group(quorum_queue, Config) ->
- [{single_active_consumer_queue_declare,
- #'queue.declare'{arguments = [
- {<<"x-single-active-consumer">>, bool, true},
- {<<"x-queue-type">>, longstr, <<"quorum">>}
- ],
- durable = true, exclusive = false, auto_delete = false}
- } | Config].
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case Ret of
+ ok ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{
+ arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config];
+ Error ->
+ {skip, {"Quorum queues are unsupported", Error}}
+ end.
end_per_group(_, Config) ->
Config.
diff --git a/test/unit_inbroker_dead_letter_SUITE.erl b/test/unit_inbroker_dead_letter_SUITE.erl
new file mode 100644
index 0000000000..900a129d3b
--- /dev/null
+++ b/test/unit_inbroker_dead_letter_SUITE.erl
@@ -0,0 +1,1149 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+%% For the full spec see: http://www.rabbitmq.com/dlx.html
+%%
+-module(unit_inbroker_dead_letter_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, dead_letter_tests}
+ ].
+
+groups() ->
+ DeadLetterTests = [dead_letter_nack,
+ dead_letter_multiple_nack,
+ dead_letter_nack_requeue,
+ dead_letter_nack_requeue_multiple,
+ dead_letter_reject,
+ dead_letter_reject_requeue,
+ dead_letter_max_length_drop_head,
+ dead_letter_missing_exchange,
+ dead_letter_routing_key,
+ dead_letter_routing_key_header_CC,
+ dead_letter_routing_key_header_BCC,
+ dead_letter_routing_key_cycle_max_length,
+ dead_letter_routing_key_cycle_with_reject,
+ dead_letter_policy,
+ dead_letter_override_policy,
+ dead_letter_ignore_policy,
+ dead_letter_headers,
+ dead_letter_headers_reason_maxlen,
+ dead_letter_headers_cycle,
+ dead_letter_headers_BCC,
+ dead_letter_headers_CC,
+ dead_letter_headers_CC_with_routing_key,
+ dead_letter_headers_first_death],
+ [
+ {dead_letter_tests, [],
+ [
+ {classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
+ {mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
+ {quorum_queue, [parallel], DeadLetterTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])),
+ DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange",
+ [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange},
+ {queue_name, Q},
+ {queue_name_dlx, Q2},
+ {policy, Policy}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}),
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Dead letter exchanges
+%%
+%% Messages are dead-lettered when:
+%% 1) message is rejected with basic.reject or basic.nack with requeue=false
+%% 2) message ttl expires (not implemented in quorum queues)
+%% 3) queue length limit is exceeded (only drop-head implemented in quorum queues)
+%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% 1) message is rejected with basic.nack, requeue=false and multiple=false
+dead_letter_nack(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume them
+ [DTag1, DTag2, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Nack the last one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Consume the last message from the dead letter queue
+ consume(Ch, DLXQName, [P3]),
+ consume_empty(Ch, DLXQName),
+ %% Nack the other two
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Consume the first two messages from the dead letter queue
+ consume(Ch, DLXQName, [P1, P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.nack, requeue=false and multiple=true
+dead_letter_multiple_nack(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume them
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Nack the last one with multiple = true
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume the 3 messages from the dead letter queue
+ consume(Ch, DLXQName, [P1, P2, P3]),
+ consume_empty(Ch, DLXQName),
+ %% Queue is empty
+ consume_empty(Ch, QName).
+
+%% 1) message is rejected with basic.nack, requeue=true and multiple=false. Dead-lettering does not take place
+dead_letter_nack_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Nack the last one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = false,
+ requeue = true}),
+ %% Consume the last message from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"1">>, <<"2">>]]),
+ consume(Ch, QName, [P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter queue is empty
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.nack, requeue=true and multiple=true. Dead-lettering does not take place
+dead_letter_nack_requeue_multiple(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Nack the last one with multiple = true
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = true,
+ requeue = true}),
+ %% Consume the three messages from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ consume(Ch, QName, [P1, P2, P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter queue is empty
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.reject, requeue=false
+dead_letter_reject(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first message
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = false}),
+ %% Consume it from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P1]),
+ consume_empty(Ch, DLXQName),
+ %% Consume the last two from the queue
+ _ = consume(Ch, QName, [P2, P3]),
+ consume_empty(Ch, QName).
+
+%% 1) Message is rejected with basic.reject, requeue=true. Dead-lettering does not take place.
+dead_letter_reject_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first one
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject the first one
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = true}),
+ %% Consume the three messages from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P1, P2, P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter is empty
+ consume_empty(Ch, DLXQName).
+
+%% 2) Message ttl expires
+dead_letter_ttl(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]),
+
+ %% Publish message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ [_] = consume(Ch, DLXQName, [P1]).
+
+%% 3) The queue length limit is exceeded, message dropped is dead lettered.
+%% Default strategy: drop-head
+dead_letter_max_length_drop_head(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the last one from the queue (max-length = 1)
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P3]),
+ consume_empty(Ch, QName),
+ %% Consume the dropped ones from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P1, P2]),
+ consume_empty(Ch, DLXQName).
+
+%% Dead letter exchange does not have to be declared when the queue is declared, but it should
+%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will
+%% be silently dropped.
+dead_letter_missing_exchange(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ DLXExchange = <<"dlx-exchange-2">>,
+ #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = DLXExchange}),
+
+ DeadLetterArgs = [{<<"x-max-length">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange},
+ {<<"x-dead-letter-routing-key">>, longstr, DLXQName}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish one message
+ publish(Ch, QName, [P1]),
+ %% Consume it
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ %% Message is not in the dead letter queue (exchange does not exist)
+ consume_empty(Ch, DLXQName),
+
+ %% Declare the dead-letter exchange
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ %% Publish another message
+ publish(Ch, QName, [P2]),
+ %% Consume it
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag2] = consume(Ch, QName, [P2]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2,
+ requeue = false}),
+ %% Consume the rejected message from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, DLXQName).
+
+%%
+%% ROUTING
+%%
+%% Dead-lettered messages are routed to their dead letter exchange either:
+%% with the routing key specified for the queue they were on; or,
+%% if this was not set, (3) with the same routing keys they were originally published with.
+%% (4) This includes routing keys added by the CC and BCC headers.
+%%
+%% 3) All previous tests used a specific key, test the original ones now.
+dead_letter_routing_key(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish, consume and nack the first message
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Both queues are empty as the message could not been routed in the dlx exchange
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ consume_empty(Ch, DLXQName),
+ %% Bind the dlx queue with the original queue routing key
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = QName}),
+ %% Publish, consume and nack the second message
+ publish(Ch, QName, [P2]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag2] = consume(Ch, QName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Message can now be routed using the recently binded key
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, QName).
+
+
+%% 4a) If a specific routing key was not set for the queue, use routing keys added by the
+%% CC and BCC headers
+dead_letter_routing_key_header_CC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+
+ %% Publish, consume and nack two messages, one with CC header
+ publish(Ch, QName, [P1]),
+ publish(Ch, QName, [P2], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% P2 is also published to the DLX queue because of the binding to the default exchange
+ [_] = consume(Ch, DLXQName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = false}),
+ %% The second message should have been routed using the CC header
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 4b) If a specific routing key was not set for the queue, use routing keys added by the
+%% CC and BCC headers
+dead_letter_routing_key_header_BCC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
+
+ %% Publish, consume and nack two messages, one with BCC header
+ publish(Ch, QName, [P1]),
+ publish(Ch, QName, [P2], [BCCHeader]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% P2 is also published to the DLX queue because of the binding to the default exchange
+ [_] = consume(Ch, DLXQName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = false}),
+ %% The second message should have been routed using the BCC header
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% It is possible to form a cycle of message dead-lettering. For instance,
+%% this can happen when a queue dead-letters messages to the default exchange without
+%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that
+%% reach the same queue twice) will be dropped if there was no rejections in the entire cycle.
+%% i.e. x-message-ttl (7), x-max-length (6)
+%%
+%% 6) Message is dead lettered due to queue length limit, and then dropped by the broker as it is
+%% republished to the same queue.
+dead_letter_routing_key_cycle_max_length(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-max-length">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish messages, consume and acknowledge the second one (x-max-length = 1)
+ publish(Ch, QName, [P1, P2]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P2]),
+ consume_empty(Ch, QName),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
+ %% Queue is empty, P1 has not been republished in a loop
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName).
+
+%% 7) Message is dead lettered due to message ttl. Not yet implemented in quorum queues
+dead_letter_routing_key_cycle_ttl(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-message-ttl">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish messages
+ publish(Ch, QName, [P1, P2]),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName).
+
+%% 5) Messages continue to be republished as there are manual rejections
+dead_letter_routing_key_cycle_with_reject(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P = <<"msg1">>,
+
+ %% Publish message
+ publish(Ch, QName, [P]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message its being republished
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [P]).
+
+%%
+%% For any given queue, a DLX can be defined by clients using the queue's arguments,
+%% or in the server using policies (8). In the case where both policy and arguments specify a DLX,
+%% the one specified in arguments overrules the one specified in policy (9).
+%%
+%% 8) Use server policies
+dead_letter_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use arguments
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = Args,
+ durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName,
+ durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish 2 messages
+ publish(Ch, QName, [P1, P2]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [DTag1, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% Nack the first one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Only one message unack left in the queue
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume_empty(Ch, DLXQName),
+
+ %% Set a policy
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, DLXExchange},
+ {<<"dead-letter-routing-key">>, DLXQName}]),
+ timer:sleep(1000),
+ %% Nack the second message
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ %% Consume the message from the dead letter queue
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 9) Argument overrides server policy
+dead_letter_override_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ %% Set a policy, it creates a cycle but message will be republished with the nack.
+ %% Good enough for this test.
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<>>},
+ {<<"dead-letter-routing-key">>, QName}]),
+
+ %% Declare arguments override the policy and set routing queue
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ [_] = consume(Ch, DLXQName, [P1]).
+
+%% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be
+%% overriden/ignored.
+dead_letter_ignore_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Set a policy
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<>>},
+ {<<"dead-letter-routing-key">>, QName}]),
+
+ P1 = <<"msg1">>,
+
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message is in the dead letter queue, original queue is empty
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, DLXQName, [P1]),
+ consume_empty(Ch, QName).
+
+%%
+%% HEADERS
+%%
+%% The dead-lettering process adds an array to the header of each dead-lettered message named
+%% x-death (10). This array contains an entry for each dead lettering event containing:
+%% queue, reason, time, exchange, routing-keys, count
+%% original-expiration (14) (if the message was dead-letterered due to per-message TTL)
+%% New entries are prepended to the beginning of the x-death array.
+%% Reason is one of the following: rejected (11), expired (12), maxlen (13)
+%%
+%% 10) and 11) Check all x-death headers, reason rejected
+dead_letter_headers(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Publish and nack a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, QName}, rabbit_misc:table_lookup(Death, <<"queue">>)),
+ ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death, <<"time">>)),
+ ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death, <<"exchange">>)),
+ ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death, <<"count">>)),
+ ?assertEqual({array, [{longstr, QName}]}, rabbit_misc:table_lookup(Death, <<"routing-keys">>)).
+
+%% 12) Per-queue message ttl has expired
+dead_letter_headers_reason_expired(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]),
+
+ %% Publish a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch(undefined, rabbit_misc:table_lookup(Death, <<"original-expiration">>)).
+
+%% 14) Per-message TTL has expired, original-expiration is added to x-death array
+dead_letter_headers_reason_expired_per_message(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Publish a message
+ P1 = <<"msg1">>,
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = P1,
+ props = #'P_basic'{expiration = <<"1">>}}),
+ %% publish another message to ensure the queue performs message expirations
+ publish(Ch, QName, [<<"msg2">>]),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch({longstr, <<"1">>}, rabbit_misc:table_lookup(Death, <<"original-expiration">>)).
+
+%% 13) Message expired with maxlen reason
+dead_letter_headers_reason_maxlen(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ publish(Ch, QName, [P1, P2]),
+ %% Consume and check reason header
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"maxlen">>}, rabbit_misc:table_lookup(Death, <<"reason">>)).
+
+%% In case x-death already contains an entry with the same queue and dead lettering reason,
+%% its count field will be incremented and it will be moved to the beginning of the array
+dead_letter_headers_cycle(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P = <<"msg1">>,
+
+ %% Publish message
+ publish(Ch, QName, [P]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
+ ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
+
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message its being republished
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
+ ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)).
+
+%% Dead-lettering a message modifies its headers:
+%% the exchange name is replaced with that of the latest dead-letter exchange,
+%% the routing key may be replaced with that specified in a queue performing dead lettering,
+%% if the above happens, the CC header will also be removed (15) and
+%% the BCC header will be removed as per Sender-selected distribution (16)
+%%
+%% CC header is kept if no dead lettering routing key is provided
+dead_letter_headers_CC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key for dead lettering, the CC header is passed
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Message is published to both queues because of CC header and DLX queue bound to both
+ %% exchanges
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual({array, [{longstr, DLXQName}]}, rabbit_misc:table_lookup(Headers3, <<"CC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+%% 15) CC header is removed when routing key is specified
+dead_letter_headers_CC_with_routing_key(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key for dead lettering, the CC header is passed
+ DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Message is published to both queues because of CC header and DLX queue bound to both
+ %% exchanges
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"CC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+%% 16) the BCC header will always be removed
+dead_letter_headers_BCC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use a specific key for dead lettering
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [BCCHeader]),
+ %% Message is published to both queues because of BCC header and DLX queue bound to both
+ %% exchanges
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+
+%% Three top-level headers are added for the very first dead-lettering event. They are
+%% x-first-death-reason, x-first-death-queue, x-first-death-exchange
+%% They have the same values as the reason, queue, and exchange fields of the original
+%% dead lettering event. Once added, these headers are never modified.
+dead_letter_headers_first_death(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Let's create a small dead-lettering loop QName -> DLXQName -> QName
+ DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ DLXDeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, QName},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable, arguments = DLXDeadLetterArgs}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+
+ %% Publish and nack a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ ?assertEqual({longstr, <<"rejected">>},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)),
+ ?assertEqual({longstr, QName},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)),
+ ?assertEqual({longstr, <<>>},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)),
+ %% Nack the message again so it gets dead lettered to the initial queue. x-first-death
+ %% headers should not change
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ ?assertEqual({longstr, <<"rejected">>},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-reason">>)),
+ ?assertEqual({longstr, QName},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-queue">>)),
+ ?assertEqual({longstr, <<>>},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-exchange">>)).
+
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+declare_dead_letter_queues(Ch, Config, QName, DLXQName) ->
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, []).
+
+declare_dead_letter_queues(Ch, Config, QName, DLXQName, ExtraArgs) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Declare DLX exchange
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+
+ %% Declare queue
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange},
+ {<<"x-dead-letter-routing-key">>, longstr, DLXQName}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ %% Declare and bind DLX queue
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}).
+
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+publish(Ch, QName, Payloads, Headers) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{headers = Headers}})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ DTag
+ end || Payload <- Payloads].
+
+consume_empty(Ch, QName) ->
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+sync_mirrors(QName, Config) ->
+ case ?config(is_mirrored, Config) of
+ true ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
+ _ -> ok
+ end.
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index e3e282f233..d2db382e30 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -568,7 +568,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 581440d179..f4f4971517 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -99,10 +99,24 @@ init_per_group(max_length_classic, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(max_length_quorum, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ false ->
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
@@ -269,8 +283,7 @@ wait_for_confirms(Unconfirmed) ->
end.
test_amqqueue(Durable) ->
- (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
@@ -695,7 +708,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
@@ -944,7 +957,7 @@ confirms1(_Config) ->
QName2 = DeclareBindDurableQueue(),
%% Get the first one's pid (we'll crash it later)
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
- QPid1 = Q1#amqqueue.pid,
+ QPid1 = amqqueue:get_pid(Q1),
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 2de5819b54..5b6a6bd6ff 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- #amqqueue{sync_slave_pids = [Pid]} = Q,
+ [Pid] = amqqueue:get_sync_slave_pids(Q),
Node1 = node(Pid),