summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl302
-rw-r--r--test/backing_queue_SUITE.erl31
-rw-r--r--test/channel_operation_timeout_SUITE.erl14
-rw-r--r--test/cluster_SUITE.erl15
-rw-r--r--test/clustering_management_SUITE.erl66
-rw-r--r--test/crashing_queues_SUITE.erl7
-rw-r--r--test/dynamic_ha_SUITE.erl4
-rw-r--r--test/dynamic_qq_SUITE.erl23
-rw-r--r--test/feature_flags_SUITE.erl372
-rw-r--r--test/mirrored_supervisor_SUITE.erl13
-rw-r--r--test/priority_queue_SUITE.erl8
-rw-r--r--test/quorum_queue_SUITE.erl76
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl8
-rw-r--r--test/single_active_consumer_SUITE.erl22
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl29
-rw-r--r--test/vhost_SUITE.erl2
17 files changed, 889 insertions, 105 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
new file mode 100644
index 0000000000..05a049c9bb
--- /dev/null
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -0,0 +1,302 @@
+-module(amqqueue_backward_compatibility_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("amqqueue.hrl").
+
+-export([all/0,
+ groups/0,
+ init_per_suite/2,
+ end_per_suite/2,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ new_amqqueue_v1_is_amqqueue/1,
+ new_amqqueue_v2_is_amqqueue/1,
+ random_term_is_not_amqqueue/1,
+
+ amqqueue_v1_is_durable/1,
+ amqqueue_v2_is_durable/1,
+ random_term_is_not_durable/1,
+
+ amqqueue_v1_state_matching/1,
+ amqqueue_v2_state_matching/1,
+ random_term_state_matching/1,
+
+ amqqueue_v1_type_matching/1,
+ amqqueue_v2_type_matching/1,
+ random_term_type_matching/1,
+
+ upgrade_v1_to_v2/1
+ ]).
+
+-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m,
+ n, o, p, q, r, s, t, u, v, w, x, y, z}).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue,
+ new_amqqueue_v2_is_amqqueue,
+ random_term_is_not_amqqueue,
+ amqqueue_v1_is_durable,
+ amqqueue_v2_is_durable,
+ random_term_is_not_durable,
+ amqqueue_v1_state_matching,
+ amqqueue_v2_state_matching,
+ random_term_state_matching,
+ amqqueue_v1_type_matching,
+ amqqueue_v2_type_matching,
+ random_term_type_matching]}
+ ].
+
+init_per_suite(_, Config) -> Config.
+end_per_suite(_, Config) -> Config.
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+new_amqqueue_v1_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v1(Queue)),
+ ?assert(not ?is_amqqueue_v2(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+new_amqqueue_v2_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2),
+ Queue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v2(Queue)),
+ ?assert(not ?is_amqqueue_v1(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+random_term_is_not_amqqueue(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?is_amqqueue(Term)),
+ ?assert(not ?is_amqqueue_v2(Term)),
+ ?assert(not ?is_amqqueue_v1(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+amqqueue_v2_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+random_term_is_not_durable(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_durable(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+amqqueue_v2_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+random_term_state_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_state_is(Term, live)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)).
+
+amqqueue_v2_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ ClassicQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_is_classic(ClassicQueue)),
+ ?assert(amqqueue:is_classic(ClassicQueue)),
+ ?assert(not ?amqqueue_is_quorum(ClassicQueue)),
+ ?assert(not amqqueue:is_quorum(ClassicQueue)),
+ QuorumQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ quorum),
+ ?assert(not ?amqqueue_is_classic(QuorumQueue)),
+ ?assert(not amqqueue:is_classic(QuorumQueue)),
+ ?assert(?amqqueue_is_quorum(QuorumQueue)),
+ ?assert(amqqueue:is_quorum(QuorumQueue)).
+
+random_term_type_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_classic(Term)),
+ ?assert(not ?amqqueue_is_quorum(Term)),
+ ?assertException(error, function_clause, amqqueue:is_classic(Term)),
+ ?assertException(error, function_clause, amqqueue:is_quorum(Term)).
+
+%% -------------------------------------------------------------------
+
+upgrade_v1_to_v2(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ OldQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue_v1(OldQueue)),
+ ?assert(not ?is_amqqueue_v2(OldQueue)),
+ NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue),
+ ?assert(not ?is_amqqueue_v1(NewQueue)),
+ ?assert(?is_amqqueue_v2(NewQueue)).
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 433bc66bff..c3f87cce59 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile(export_all).
@@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) ->
bq_variable_queue_delete_msg_store_files_callback1(Config) ->
ok = restart_msg_store_empty(),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(
- queue_name(Config,
- <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
publish_and_confirm(Q, Payload, Count),
@@ -718,9 +718,10 @@ bq_queue_recover(Config) ->
bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
@@ -736,7 +737,8 @@ bq_queue_recover1(Config) ->
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
- fun (Q1 = #amqqueue { pid = QPid1 }) ->
+ fun (Q1) when ?is_amqqueue(Q1) ->
+ QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
@@ -752,7 +754,9 @@ bq_queue_recover1(Config) ->
passed.
%% Return the PID of the given queue's supervisor.
-get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) ->
+get_queue_sup_pid(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
VHost = QName#resource.virtual_host,
{ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
Sups = supervisor:which_children(AmqSup),
@@ -1413,8 +1417,8 @@ with_fresh_variable_queue(Fun, Mode) ->
shutdown, Fun(VQ1, QName)),
Me ! Ref
catch
- Type:Error ->
- Me ! {Ref, Type, Error, erlang:get_stacktrace()}
+ Type:Error:Stacktrace ->
+ Me ! {Ref, Type, Error, Stacktrace}
end
end),
receive
@@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
end, {VQ, []}, lists:seq(1, Count)).
test_amqqueue(QName, Durable) ->
- (rabbit_amqqueue:pseudo_queue(QName, self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(QName, self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index 9bfa0ae07a..77da6133d8 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile([export_all]).
@@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node),
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, consumers_all, [VHost]).
-get_amqqueue(Q, []) -> throw({not_found, Q});
-get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ;
-get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem).
+get_amqqueue(QName0, []) ->
+ throw({not_found, QName0});
+get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
+ QName1 = amqqueue:get_name(Q),
+ compare_amqqueue(QName0, QName1, Q, Rem).
+
+compare_amqqueue(QName, QName, Q, _Rem) ->
+ Q;
+compare_amqqueue(QName, _, _, Rem) ->
+ get_amqqueue(QName, Rem).
qconfig(Ch, Name, Ex, Consume, Deliver) ->
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index c52dc9ef64..dc30825f8c 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("include/amqqueue.hrl").
-compile(export_all).
@@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
- {new, #amqqueue{name = QueueName, pid = QPid}} =
- rabbit_amqqueue:declare(QueueName, false, false, [],
- none, <<"acting-user">>),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QueueName = ?amqqueue_field_name(Q),
+ QPid = ?amqqueue_field_pid(Q),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
@@ -269,12 +270,12 @@ must_exit(Fun) ->
end.
dead_queue_loop(QueueName, OldPid) ->
- {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
- <<"acting-user">>),
- case Q#amqqueue.pid of
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QPid = ?amqqueue_field_pid(Q),
+ case QPid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);
- _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ _ -> true = rabbit_misc:is_process_alive(QPid),
Q
end.
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 120257feb9..5ae2fb687c 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -315,14 +315,14 @@ forget_offline_removes_things(Config) ->
forget_promotes_offline_slave(Config) ->
[A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
- Q = <<"mirrored-queue">>,
- declare(ACh, Q),
- set_ha_policy(Config, Q, A, [B, C]),
- set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves
+ QName = <<"mirrored-queue">>,
+ declare(ACh, QName),
+ set_ha_policy(Config, QName, A, [B, C]),
+ set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves
%% Publish and confirm
amqp_channel:call(ACh, #'confirm.select'{}),
- amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q},
+ amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
amqp_channel:wait_for_confirms(ACh),
@@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, D),
DCh2 = rabbit_ct_client_helpers:open_channel(Config, D),
- #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q),
+ #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName),
ok.
-set_ha_policy(Config, Q, Master, Slaves) ->
+set_ha_policy(Config, QName, Master, Slaves) ->
Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]],
- rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q,
- {<<"nodes">>, Nodes}),
- await_slaves(Q, Master, Slaves).
-
-await_slaves(Q, Master, Slaves) ->
- {ok, #amqqueue{pid = MPid,
- slave_pids = SPids}} =
- rpc:call(Master, rabbit_amqqueue, lookup,
- [rabbit_misc:r(<<"/">>, queue, Q)]),
- ActMaster = node(MPid),
+ HaPolicy = {<<"nodes">>, Nodes},
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy),
+ await_slaves(QName, Master, Slaves).
+
+await_slaves(QName, Master, Slaves) ->
+ await_slaves_0(QName, Master, Slaves, 10).
+
+await_slaves_0(QName, Master, Slaves0, Tries) ->
+ {ok, Queue} = await_slaves_lookup_queue(QName, Master),
+ SPids = amqqueue:get_slave_pids(Queue),
+ ActMaster = amqqueue:qnode(Queue),
ActSlaves = lists:usort([node(P) || P <- SPids]),
- case {Master, lists:usort(Slaves)} of
- {ActMaster, ActSlaves} -> ok;
- _ -> timer:sleep(100),
- await_slaves(Q, Master, Slaves)
+ Slaves1 = lists:usort(Slaves0),
+ await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries).
+
+await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) ->
+ error({timeout_waiting_for_slaves, QName});
+await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) ->
+ case {Master, Slaves} of
+ {ActMaster, ActSlaves} ->
+ ok;
+ _ ->
+ timer:sleep(250),
+ await_slaves_0(QName, Master, Slaves, Tries - 1)
+ end.
+
+await_slaves_lookup_queue(QName, Master) ->
+ await_slaves_lookup_queue(QName, Master, 10).
+
+await_slaves_lookup_queue(QName, _Master, 0) ->
+ error({timeout_looking_up_queue, QName});
+await_slaves_lookup_queue(QName, Master, Tries) ->
+ RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)],
+ case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of
+ {error, not_found} ->
+ timer:sleep(250),
+ await_slaves_lookup_queue(QName, Master, Tries - 1);
+ {ok, Q} ->
+ {ok, Q}
end.
force_boot(Config) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 2d91083abc..7b8ec91346 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -217,9 +217,10 @@ kill_queue(Node, QName) ->
await_new_pid(Node, QName, Pid1).
queue_pid(Node, QName) ->
- #amqqueue{pid = QPid,
- state = State,
- name = #resource{virtual_host = VHost}} = lookup(Node, QName),
+ Q = lookup(Node, QName),
+ QPid = amqqueue:get_pid(Q),
+ State = amqqueue:get_state(Q),
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
index e41c07a888..6ccf3a75c3 100644
--- a/test/dynamic_ha_SUITE.erl
+++ b/test/dynamic_ha_SUITE.erl
@@ -614,8 +614,8 @@ get_stacktrace() ->
try
throw(e)
catch
- _:e ->
- erlang:get_stacktrace()
+ _:e:Stacktrace ->
+ Stacktrace
end.
%%----------------------------------------------------------------------------
diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl
index fbc1e81827..89344af30c 100644
--- a/test/dynamic_qq_SUITE.erl
+++ b/test/dynamic_qq_SUITE.erl
@@ -83,9 +83,26 @@ init_per_testcase(Testcase, Config) ->
{queue_name, Q},
{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}
]),
- rabbit_ct_helpers:run_steps(Config1,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps()).
+ Config2 = rabbit_ct_helpers:run_steps(
+ Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config2;
+ false ->
+ end_per_testcase(Testcase, Config2),
+ {skip, "Quorum queues are unsupported"}
+ end.
end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
diff --git a/test/feature_flags_SUITE.erl b/test/feature_flags_SUITE.erl
new file mode 100644
index 0000000000..db87442105
--- /dev/null
+++ b/test/feature_flags_SUITE.erl
@@ -0,0 +1,372 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(feature_flags_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-export([suite/0,
+ all/0,
+ groups/0,
+ init_per_suite/1,
+ end_per_suite/1,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ enable_quorum_queue_in_a_healthy_situation/1,
+ enable_unsupported_feature_flag_in_a_healthy_situation/1,
+ enable_quorum_queue_when_ff_file_is_unwritable/1,
+ enable_quorum_queue_with_a_network_partition/1,
+ mark_quorum_queue_as_enabled_with_a_network_partition/1
+ ]).
+
+suite() ->
+ [{timetrap, 5 * 60000}].
+
+all() ->
+ [
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {unclustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable
+ ]},
+ {clustered, [],
+ [
+ enable_quorum_queue_in_a_healthy_situation,
+ enable_unsupported_feature_flag_in_a_healthy_situation,
+ enable_quorum_queue_when_ff_file_is_unwritable,
+ enable_quorum_queue_with_a_network_partition,
+ mark_quorum_queue_as_enabled_with_a_network_partition
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]);
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]);
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, false},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}},
+ {net_ticktime, 5}
+ ]),
+ Config2 = rabbit_ct_helpers:merge_app_env(
+ Config1,
+ {rabbit,
+ [{forced_feature_flags_on_init, []},
+ {log, [{file, [{level, debug}]}]}]}),
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, is_supported, [quorum_queue]),
+ case Ret of
+ true ->
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end.
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+enable_quorum_queue_in_a_healthy_situation(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Re-enabling the feature flag also works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_unsupported_feature_flag_in_a_healthy_situation(Config) ->
+ FeatureName = unsupported_feature_flag,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is unsupported and thus disabled.
+ ?assertEqual(
+ False,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+enable_quorum_queue_when_ff_file_is_unwritable(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Node = ClusterSize - 1,
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+ Files = feature_flags_files(Config),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Restrict permissions on the `feature_flags` files.
+ [?assertEqual(ok, file:change_mode(File, 8#0444)) || File <- Files],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, Node, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% The `feature_flags` file were not updated.
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]),
+
+ %% Stop all nodes and restore permissions on the `feature_flags` files.
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- Nodes],
+ [?assertEqual(ok, file:change_mode(File, 8#0644)) || File <- Files],
+
+ %% Restart all nodes and assert the feature flag is still enabled and
+ %% the `feature_flags` files were correctly repaired.
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- lists:reverse(Nodes)],
+
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)),
+ ?assertEqual(
+ lists:duplicate(ClusterSize, {ok, [[FeatureName]]}),
+ [file:consult(File) || File <- feature_flags_files(Config)]).
+
+enable_quorum_queue_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate nodes B and E from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {E, A},
+ {E, C},
+ {E, D}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Enabling the feature flag should fail in the specific case of
+ %% `quorum_queue`, if the network is broken.
+ ?assertEqual(
+ {error, unsupported},
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Repair the network and try again to enable the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000),
+ [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N))
+ || N <- [A, C, D]],
+ [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N))
+ || N <- [A, C, D]],
+
+ %% Enabling the feature flag works.
+ ?assertEqual(
+ ok,
+ enable_feature_flag_on(Config, B, FeatureName)),
+ ?assertEqual(
+ True,
+ is_feature_flag_enabled(Config, FeatureName)).
+
+mark_quorum_queue_as_enabled_with_a_network_partition(Config) ->
+ FeatureName = quorum_queue,
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ True = lists:duplicate(ClusterSize, true),
+ False = lists:duplicate(ClusterSize, false),
+
+ %% The feature flag is supported but disabled initially.
+ ?assertEqual(
+ True,
+ is_feature_flag_supported(Config, FeatureName)),
+ ?assertEqual(
+ False,
+ is_feature_flag_enabled(Config, FeatureName)),
+
+ %% Isolate node B from the rest of the cluster.
+ NodePairs = [{B, A},
+ {B, C},
+ {B, D},
+ {B, E}],
+ block(NodePairs),
+ timer:sleep(1000),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to timeout.
+ RemoteNodes = [A, C, D, E],
+ ?assertEqual(
+ {failed_to_mark_feature_flag_as_enabled_on_remote_nodes,
+ FeatureName,
+ true,
+ RemoteNodes},
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 20000])),
+
+ RepairFun = fun() ->
+ %% Wait a few seconds before we repair the network.
+ timer:sleep(5000),
+
+ %% Repair the network and try again to enable
+ %% the feature flag.
+ unblock(NodePairs),
+ timer:sleep(1000)
+ end,
+ spawn(RepairFun),
+
+ %% Mark the feature flag as enabled on all nodes from node B. This
+ %% is expected to work this time.
+ ct:pal(?LOW_IMPORTANCE,
+ "Marking the feature flag as enabled on remote nodes...", []),
+ ?assertEqual(
+ ok,
+ rabbit_ct_broker_helpers:rpc(
+ Config, B,
+ rabbit_feature_flags, mark_as_enabled_remotely,
+ [RemoteNodes, FeatureName, true, 120000])).
+
+%% FIXME: Finish the testcase above ^
+
+%% -------------------------------------------------------------------
+%% Internal helpers.
+%% -------------------------------------------------------------------
+
+enable_feature_flag_on(Config, Node, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc(
+ Config, Node, rabbit_feature_flags, enable, [FeatureName]).
+
+is_feature_flag_supported(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_supported, [FeatureName]).
+
+is_feature_flag_enabled(Config, FeatureName) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, is_enabled, [FeatureName]).
+
+feature_flags_files(Config) ->
+ rabbit_ct_broker_helpers:rpc_all(
+ Config, rabbit_feature_flags, enabled_feature_flags_list_file, []).
+
+block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs].
+unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs].
+
+block(X, Y) ->
+ rabbit_ct_broker_helpers:block_traffic_between(X, Y).
+
+allow(X, Y) ->
+ rabbit_ct_broker_helpers:allow_traffic_between(X, Y).
diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl
index d3cc080eeb..aa114e0a84 100644
--- a/test/mirrored_supervisor_SUITE.erl
+++ b/test/mirrored_supervisor_SUITE.erl
@@ -294,14 +294,17 @@ get_group(Group) ->
call(Id, Msg) -> call(Id, Msg, 10*1000, 100).
-call(Id, Msg, 0, _Decr) ->
- exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
-
call(Id, Msg, MaxDelay, Decr) ->
+ call(Id, Msg, MaxDelay, Decr, undefined).
+
+call(Id, Msg, 0, _Decr, Stacktrace) ->
+ exit({timeout_waiting_for_server, {Id, Msg}, Stacktrace});
+
+call(Id, Msg, MaxDelay, Decr, _) ->
try
gen_server:call(Id, Msg, infinity)
- catch exit:_ -> timer:sleep(Decr),
- call(Id, Msg, MaxDelay - Decr, Decr)
+ catch exit:_:Stacktrace -> timer:sleep(Decr),
+ call(Id, Msg, MaxDelay - Decr, Decr, Stacktrace)
end.
kill(Pid) -> kill(Pid, []).
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 9db866e3c6..8a0ab98241 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_head_message_timestamp-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(head_message_timestamp, BQS1),
@@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) ->
ram_duration(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
{_Duration1, BQS2} = PQ:ram_duration(BQS1),
BQS3 = PQ:set_ram_duration_target(infinity, BQS2),
BQS4 = PQ:set_ram_duration_target(1, BQS3),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index b1f5cfff61..fd58756fda 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -29,6 +29,9 @@
-compile(export_all).
+suite() ->
+ [{timetrap, 5 * 60000}].
+
all() ->
[
{group, single_node},
@@ -173,17 +176,32 @@ init_per_group(Group, Config) ->
Config2 = rabbit_ct_helpers:run_steps(Config1b,
[fun merge_app_env/1 ] ++
rabbit_ct_broker_helpers:setup_steps()),
- ok = rabbit_ct_broker_helpers:rpc(
- Config2, 0, application, set_env,
- [rabbit, channel_queue_cleanup_interval, 100]),
- %% HACK: the larger cluster sizes benefit for a bit more time
- %% after clustering before running the tests.
- case Group of
- cluster_size_5 ->
- timer:sleep(5000),
- Config2;
- _ ->
- Config2
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config2, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config2, 0, application, set_env,
+ [rabbit, channel_queue_cleanup_interval, 100]),
+ %% HACK: the larger cluster sizes benefit for a bit more time
+ %% after clustering before running the tests.
+ case Group of
+ cluster_size_5 ->
+ timer:sleep(5000),
+ Config2;
+ _ ->
+ Config2
+ end;
+ false ->
+ end_per_group(Group, Config2),
+ {skip, "Quorum queues are unsupported"}
end.
end_per_group(clustered, Config) ->
@@ -207,11 +225,28 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ
{tcp_ports_base},
{queue_name, Q}
]),
- rabbit_ct_helpers:run_steps(Config2,
- rabbit_ct_broker_helpers:setup_steps() ++
- rabbit_ct_client_helpers:setup_steps() ++
- [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
- fun rabbit_ct_broker_helpers:cluster_nodes/1]);
+ Config3 = rabbit_ct_helpers:run_steps(
+ Config2,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++
+ [fun rabbit_ct_broker_helpers:enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1]),
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config3, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config3, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config3, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ Config3;
+ false ->
+ end_per_testcase(Testcase, Config3),
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []),
@@ -2194,11 +2229,10 @@ assert_queue_type(Server, Q, Expected) ->
Actual = get_queue_type(Server, Q),
Expected = Actual.
-get_queue_type(Server, Q) ->
- QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
- {ok, AMQQueue} =
- rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
- AMQQueue#amqqueue.type.
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 7ae43aa7e1..42ac863ead 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -364,11 +364,9 @@ cluster_queue_metrics(Config) ->
% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
- [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
- ets, lookup,
- [rabbit_queue, Name]),
- ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
- sync_mirrors, [QPid]),
+ [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]),
+ QPid = amqqueue:get_pid(Q),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
% Check ETS table for data
wait_for(fun () ->
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 25261042b2..6071aeb5a5 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -66,13 +66,21 @@ init_per_group(classic_queue, Config) ->
auto_delete = true}
} | Config];
init_per_group(quorum_queue, Config) ->
- [{single_active_consumer_queue_declare,
- #'queue.declare'{arguments = [
- {<<"x-single-active-consumer">>, bool, true},
- {<<"x-queue-type">>, longstr, <<"quorum">>}
- ],
- durable = true, exclusive = false, auto_delete = false}
- } | Config].
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ case Ret of
+ ok ->
+ [{single_active_consumer_queue_declare,
+ #'queue.declare'{
+ arguments = [
+ {<<"x-single-active-consumer">>, bool, true},
+ {<<"x-queue-type">>, longstr, <<"quorum">>}
+ ],
+ durable = true, exclusive = false, auto_delete = false}
+ } | Config];
+ Error ->
+ {skip, {"Quorum queues are unsupported", Error}}
+ end.
end_per_group(_, Config) ->
Config.
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index e3e282f233..d2db382e30 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -568,7 +568,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index 581440d179..f4f4971517 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -99,10 +99,24 @@ init_per_group(max_length_classic, Config) ->
[{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
{queue_durable, false}]);
init_per_group(max_length_quorum, Config) ->
- rabbit_ct_helpers:set_config(
- Config,
- [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
- {queue_durable, true}]);
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ Ret = rabbit_ct_broker_helpers:rpc(
+ Config, 0,
+ rabbit_feature_flags,
+ is_supported_remotely,
+ [Nodes, [quorum_queue], 60000]),
+ case Ret of
+ true ->
+ ok = rabbit_ct_broker_helpers:rpc(
+ Config, 0, rabbit_feature_flags, enable, [quorum_queue]),
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+ false ->
+ {skip, "Quorum queues are unsupported"}
+ end;
init_per_group(max_length_mirrored, Config) ->
rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
<<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
@@ -269,8 +283,7 @@ wait_for_confirms(Unconfirmed) ->
end.
test_amqqueue(Durable) ->
- (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
@@ -695,7 +708,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
@@ -944,7 +957,7 @@ confirms1(_Config) ->
QName2 = DeclareBindDurableQueue(),
%% Get the first one's pid (we'll crash it later)
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
- QPid1 = Q1#amqqueue.pid,
+ QPid1 = amqqueue:get_pid(Q1),
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 2de5819b54..5b6a6bd6ff 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- #amqqueue{sync_slave_pids = [Pid]} = Q,
+ [Pid] = amqqueue:get_sync_slave_pids(Q),
Node1 = node(Pid),