summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/amqqueue_backward_compatibility_SUITE.erl302
-rw-r--r--test/backing_queue_SUITE.erl27
-rw-r--r--test/channel_operation_timeout_SUITE.erl14
-rw-r--r--test/cluster_SUITE.erl15
-rw-r--r--test/clustering_management_SUITE.erl66
-rw-r--r--test/crashing_queues_SUITE.erl7
-rw-r--r--test/priority_queue_SUITE.erl8
-rw-r--r--test/quorum_queue_SUITE.erl9
-rw-r--r--test/rabbit_core_metrics_gc_SUITE.erl8
-rw-r--r--test/unit_inbroker_non_parallel_SUITE.erl2
-rw-r--r--test/unit_inbroker_parallel_SUITE.erl7
-rw-r--r--test/vhost_SUITE.erl2
12 files changed, 401 insertions, 66 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl
new file mode 100644
index 0000000000..05a049c9bb
--- /dev/null
+++ b/test/amqqueue_backward_compatibility_SUITE.erl
@@ -0,0 +1,302 @@
+-module(amqqueue_backward_compatibility_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-include("amqqueue.hrl").
+
+-export([all/0,
+ groups/0,
+ init_per_suite/2,
+ end_per_suite/2,
+ init_per_group/2,
+ end_per_group/2,
+ init_per_testcase/2,
+ end_per_testcase/2,
+
+ new_amqqueue_v1_is_amqqueue/1,
+ new_amqqueue_v2_is_amqqueue/1,
+ random_term_is_not_amqqueue/1,
+
+ amqqueue_v1_is_durable/1,
+ amqqueue_v2_is_durable/1,
+ random_term_is_not_durable/1,
+
+ amqqueue_v1_state_matching/1,
+ amqqueue_v2_state_matching/1,
+ random_term_state_matching/1,
+
+ amqqueue_v1_type_matching/1,
+ amqqueue_v2_type_matching/1,
+ random_term_type_matching/1,
+
+ upgrade_v1_to_v2/1
+ ]).
+
+-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m,
+ n, o, p, q, r, s, t, u, v, w, x, y, z}).
+
+all() ->
+ [
+ {group, parallel_tests}
+ ].
+
+groups() ->
+ [
+ {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue,
+ new_amqqueue_v2_is_amqqueue,
+ random_term_is_not_amqqueue,
+ amqqueue_v1_is_durable,
+ amqqueue_v2_is_durable,
+ random_term_is_not_durable,
+ amqqueue_v1_state_matching,
+ amqqueue_v2_state_matching,
+ random_term_state_matching,
+ amqqueue_v1_type_matching,
+ amqqueue_v2_type_matching,
+ random_term_type_matching]}
+ ].
+
+init_per_suite(_, Config) -> Config.
+end_per_suite(_, Config) -> Config.
+
+init_per_group(_, Config) -> Config.
+end_per_group(_, Config) -> Config.
+
+init_per_testcase(_, Config) -> Config.
+end_per_testcase(_, Config) -> Config.
+
+new_amqqueue_v1_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v1(Queue)),
+ ?assert(not ?is_amqqueue_v2(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+new_amqqueue_v2_is_amqqueue(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2),
+ Queue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?is_amqqueue(Queue)),
+ ?assert(?is_amqqueue_v2(Queue)),
+ ?assert(not ?is_amqqueue_v1(Queue)),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)),
+ ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)),
+ ?assert(?amqqueue_has_valid_pid(Queue)),
+ ?assert(?amqqueue_pid_equals(Queue, self())),
+ ?assert(?amqqueue_pids_are_equal(Queue, Queue)),
+ ?assert(?amqqueue_pid_runs_on_local_node(Queue)),
+ ?assert(amqqueue:qnode(Queue) == node()).
+
+random_term_is_not_amqqueue(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?is_amqqueue(Term)),
+ ?assert(not ?is_amqqueue_v2(Term)),
+ ?assert(not ?is_amqqueue_v1(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+amqqueue_v2_is_durable(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ TransientQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ false,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ DurableQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(not ?amqqueue_is_durable(TransientQueue)),
+ ?assert(?amqqueue_is_durable(DurableQueue)).
+
+random_term_is_not_durable(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_durable(Term)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+amqqueue_v2_state_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue1 = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_state_is(Queue1, live)),
+ Queue2 = amqqueue:set_state(Queue1, stopped),
+ ?assert(?amqqueue_state_is(Queue2, stopped)).
+
+random_term_state_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_state_is(Term, live)).
+
+%% -------------------------------------------------------------------
+
+amqqueue_v1_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ Queue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?amqqueue_is_classic(Queue)),
+ ?assert(amqqueue:is_classic(Queue)),
+ ?assert(not ?amqqueue_is_quorum(Queue)).
+
+amqqueue_v2_type_matching(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ ClassicQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ classic),
+ ?assert(?amqqueue_is_classic(ClassicQueue)),
+ ?assert(amqqueue:is_classic(ClassicQueue)),
+ ?assert(not ?amqqueue_is_quorum(ClassicQueue)),
+ ?assert(not amqqueue:is_quorum(ClassicQueue)),
+ QuorumQueue = amqqueue:new_with_version(amqqueue_v2,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ quorum),
+ ?assert(not ?amqqueue_is_classic(QuorumQueue)),
+ ?assert(not amqqueue:is_classic(QuorumQueue)),
+ ?assert(?amqqueue_is_quorum(QuorumQueue)),
+ ?assert(amqqueue:is_quorum(QuorumQueue)).
+
+random_term_type_matching(_) ->
+ Term = ?long_tuple,
+ ?assert(not ?amqqueue_is_classic(Term)),
+ ?assert(not ?amqqueue_is_quorum(Term)),
+ ?assertException(error, function_clause, amqqueue:is_classic(Term)),
+ ?assertException(error, function_clause, amqqueue:is_quorum(Term)).
+
+%% -------------------------------------------------------------------
+
+upgrade_v1_to_v2(_) ->
+ VHost = <<"/">>,
+ Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1),
+ OldQueue = amqqueue:new_with_version(amqqueue_v1,
+ Name,
+ self(),
+ true,
+ false,
+ none,
+ [],
+ VHost,
+ #{},
+ ?amqqueue_v1_type),
+ ?assert(?is_amqqueue_v1(OldQueue)),
+ ?assert(not ?is_amqqueue_v2(OldQueue)),
+ NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue),
+ ?assert(not ?is_amqqueue_v1(NewQueue)),
+ ?assert(?is_amqqueue_v2(NewQueue)).
diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl
index 433bc66bff..bc9b2c8ced 100644
--- a/test/backing_queue_SUITE.erl
+++ b/test/backing_queue_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile(export_all).
@@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) ->
bq_variable_queue_delete_msg_store_files_callback1(Config) ->
ok = restart_msg_store_empty(),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(
- queue_name(Config,
- <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
Payload = <<0:8388608>>, %% 1MB
Count = 30,
publish_and_confirm(Q, Payload, Count),
@@ -718,9 +718,10 @@ bq_queue_recover(Config) ->
bq_queue_recover1(Config) ->
Count = 2 * rabbit_queue_index:next_segment_boundary(0),
- {new, #amqqueue { pid = QPid, name = QName } = Q} =
- rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>),
- true, false, [], none, <<"acting-user">>),
+ QName0 = queue_name(Config, <<"bq_queue_recover-q">>),
+ {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>),
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
publish_and_confirm(Q, <<>>, Count),
SupPid = get_queue_sup_pid(Q),
@@ -736,7 +737,8 @@ bq_queue_recover1(Config) ->
{ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
- fun (Q1 = #amqqueue { pid = QPid1 }) ->
+ fun (Q1) when ?is_amqqueue(Q1) ->
+ QPid1 = amqqueue:get_pid(Q1),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false, Limiter,
@@ -752,7 +754,9 @@ bq_queue_recover1(Config) ->
passed.
%% Return the PID of the given queue's supervisor.
-get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) ->
+get_queue_sup_pid(Q) when ?is_amqqueue(Q) ->
+ QName = amqqueue:get_name(Q),
+ QPid = amqqueue:get_pid(Q),
VHost = QName#resource.virtual_host,
{ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)),
Sups = supervisor:which_children(AmqSup),
@@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
end, {VQ, []}, lists:seq(1, Count)).
test_amqqueue(QName, Durable) ->
- (rabbit_amqqueue:pseudo_queue(QName, self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(QName, self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
index 9bfa0ae07a..77da6133d8 100644
--- a/test/channel_operation_timeout_SUITE.erl
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("amqqueue.hrl").
-compile([export_all]).
@@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node),
rabbit_ct_broker_helpers:rpc(Config, Node,
rabbit_amqqueue, consumers_all, [VHost]).
-get_amqqueue(Q, []) -> throw({not_found, Q});
-get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ;
-get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem).
+get_amqqueue(QName0, []) ->
+ throw({not_found, QName0});
+get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) ->
+ QName1 = amqqueue:get_name(Q),
+ compare_amqqueue(QName0, QName1, Q, Rem).
+
+compare_amqqueue(QName, QName, Q, _Rem) ->
+ Q;
+compare_amqqueue(QName, _, _, Rem) ->
+ get_amqqueue(QName, Rem).
qconfig(Ch, Name, Ex, Consume, Deliver) ->
[{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl
index c52dc9ef64..dc30825f8c 100644
--- a/test/cluster_SUITE.erl
+++ b/test/cluster_SUITE.erl
@@ -18,6 +18,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("amqp_client/include/amqp_client.hrl").
+-include("include/amqqueue.hrl").
-compile(export_all).
@@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) ->
Self = self(),
Pid = spawn(SecondaryNode,
fun () ->
- {new, #amqqueue{name = QueueName, pid = QPid}} =
- rabbit_amqqueue:declare(QueueName, false, false, [],
- none, <<"acting-user">>),
+ {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QueueName = ?amqqueue_field_name(Q),
+ QPid = ?amqqueue_field_pid(Q),
exit(QPid, kill),
Self ! {self(), killed, QPid}
end),
@@ -269,12 +270,12 @@ must_exit(Fun) ->
end.
dead_queue_loop(QueueName, OldPid) ->
- {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none,
- <<"acting-user">>),
- case Q#amqqueue.pid of
+ {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>),
+ QPid = ?amqqueue_field_pid(Q),
+ case QPid of
OldPid -> timer:sleep(25),
dead_queue_loop(QueueName, OldPid);
- _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid),
+ _ -> true = rabbit_misc:is_process_alive(QPid),
Q
end.
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index 120257feb9..5ae2fb687c 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -315,14 +315,14 @@ forget_offline_removes_things(Config) ->
forget_promotes_offline_slave(Config) ->
[A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
ACh = rabbit_ct_client_helpers:open_channel(Config, A),
- Q = <<"mirrored-queue">>,
- declare(ACh, Q),
- set_ha_policy(Config, Q, A, [B, C]),
- set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves
+ QName = <<"mirrored-queue">>,
+ declare(ACh, QName),
+ set_ha_policy(Config, QName, A, [B, C]),
+ set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves
%% Publish and confirm
amqp_channel:call(ACh, #'confirm.select'{}),
- amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q},
+ amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName},
#amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
amqp_channel:wait_for_confirms(ACh),
@@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) ->
ok = rabbit_ct_broker_helpers:start_node(Config, D),
DCh2 = rabbit_ct_client_helpers:open_channel(Config, D),
- #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q),
+ #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName),
ok.
-set_ha_policy(Config, Q, Master, Slaves) ->
+set_ha_policy(Config, QName, Master, Slaves) ->
Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]],
- rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q,
- {<<"nodes">>, Nodes}),
- await_slaves(Q, Master, Slaves).
-
-await_slaves(Q, Master, Slaves) ->
- {ok, #amqqueue{pid = MPid,
- slave_pids = SPids}} =
- rpc:call(Master, rabbit_amqqueue, lookup,
- [rabbit_misc:r(<<"/">>, queue, Q)]),
- ActMaster = node(MPid),
+ HaPolicy = {<<"nodes">>, Nodes},
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy),
+ await_slaves(QName, Master, Slaves).
+
+await_slaves(QName, Master, Slaves) ->
+ await_slaves_0(QName, Master, Slaves, 10).
+
+await_slaves_0(QName, Master, Slaves0, Tries) ->
+ {ok, Queue} = await_slaves_lookup_queue(QName, Master),
+ SPids = amqqueue:get_slave_pids(Queue),
+ ActMaster = amqqueue:qnode(Queue),
ActSlaves = lists:usort([node(P) || P <- SPids]),
- case {Master, lists:usort(Slaves)} of
- {ActMaster, ActSlaves} -> ok;
- _ -> timer:sleep(100),
- await_slaves(Q, Master, Slaves)
+ Slaves1 = lists:usort(Slaves0),
+ await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries).
+
+await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) ->
+ error({timeout_waiting_for_slaves, QName});
+await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) ->
+ case {Master, Slaves} of
+ {ActMaster, ActSlaves} ->
+ ok;
+ _ ->
+ timer:sleep(250),
+ await_slaves_0(QName, Master, Slaves, Tries - 1)
+ end.
+
+await_slaves_lookup_queue(QName, Master) ->
+ await_slaves_lookup_queue(QName, Master, 10).
+
+await_slaves_lookup_queue(QName, _Master, 0) ->
+ error({timeout_looking_up_queue, QName});
+await_slaves_lookup_queue(QName, Master, Tries) ->
+ RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)],
+ case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of
+ {error, not_found} ->
+ timer:sleep(250),
+ await_slaves_lookup_queue(QName, Master, Tries - 1);
+ {ok, Q} ->
+ {ok, Q}
end.
force_boot(Config) ->
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
index 2d91083abc..7b8ec91346 100644
--- a/test/crashing_queues_SUITE.erl
+++ b/test/crashing_queues_SUITE.erl
@@ -217,9 +217,10 @@ kill_queue(Node, QName) ->
await_new_pid(Node, QName, Pid1).
queue_pid(Node, QName) ->
- #amqqueue{pid = QPid,
- state = State,
- name = #resource{virtual_host = VHost}} = lookup(Node, QName),
+ Q = lookup(Node, QName),
+ QPid = amqqueue:get_pid(Q),
+ State = amqqueue:get_state(Q),
+ #resource{virtual_host = VHost} = amqqueue:get_name(Q),
case State of
crashed ->
case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
index 9db866e3c6..8a0ab98241 100644
--- a/test/priority_queue_SUITE.erl
+++ b/test/priority_queue_SUITE.erl
@@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue,
<<"info_head_message_timestamp-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
%% The queue is empty: no timestamp.
true = PQ:is_empty(BQS1),
'' = PQ:info(head_message_timestamp, BQS1),
@@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) ->
ram_duration(_Config) ->
QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>),
Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
- Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]},
+ Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]),
PQ = rabbit_priority_queue,
- BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end),
{_Duration1, BQS2} = PQ:ram_duration(BQS1),
BQS3 = PQ:set_ram_duration_target(infinity, BQS2),
BQS4 = PQ:set_ram_duration_target(1, BQS3),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 9f40bc7b0c..0f9010f204 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -2228,11 +2228,10 @@ assert_queue_type(Server, Q, Expected) ->
Actual = get_queue_type(Server, Q),
Expected = Actual.
-get_queue_type(Server, Q) ->
- QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
- {ok, AMQQueue} =
- rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
- AMQQueue#amqqueue.type.
+get_queue_type(Server, Q0) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q0),
+ {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
+ amqqueue:get_type(Q1).
wait_for_messages(Config, Stats) ->
wait_for_messages(Config, lists:sort(Stats), 60).
diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl
index 7ae43aa7e1..42ac863ead 100644
--- a/test/rabbit_core_metrics_gc_SUITE.erl
+++ b/test/rabbit_core_metrics_gc_SUITE.erl
@@ -364,11 +364,9 @@ cluster_queue_metrics(Config) ->
% Synchronize
Name = rabbit_misc:r(VHost, queue, QueueName),
- [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0,
- ets, lookup,
- [rabbit_queue, Name]),
- ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue,
- sync_mirrors, [QPid]),
+ [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]),
+ QPid = amqqueue:get_pid(Q),
+ ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]),
% Check ETS table for data
wait_for(fun () ->
diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl
index e3e282f233..d2db382e30 100644
--- a/test/unit_inbroker_non_parallel_SUITE.erl
+++ b/test/unit_inbroker_non_parallel_SUITE.erl
@@ -568,7 +568,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl
index d028ed3ccf..f4f4971517 100644
--- a/test/unit_inbroker_parallel_SUITE.erl
+++ b/test/unit_inbroker_parallel_SUITE.erl
@@ -283,8 +283,7 @@ wait_for_confirms(Unconfirmed) ->
end.
test_amqqueue(Durable) ->
- (rabbit_amqqueue:pseudo_queue(test_queue(), self()))
- #amqqueue { durable = Durable }.
+ rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable).
assert_prop(List, Prop, Value) ->
case proplists:get_value(Prop, List)of
@@ -709,7 +708,7 @@ head_message_timestamp1(_Config) ->
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q1} = rabbit_amqqueue:lookup(QRes),
- QPid = Q1#amqqueue.pid,
+ QPid = amqqueue:get_pid(Q1),
%% Set up event receiver for queue
dummy_event_receiver:start(self(), [node()], [queue_stats]),
@@ -958,7 +957,7 @@ confirms1(_Config) ->
QName2 = DeclareBindDurableQueue(),
%% Get the first one's pid (we'll crash it later)
{ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)),
- QPid1 = Q1#amqqueue.pid,
+ QPid1 = amqqueue:get_pid(Q1),
%% Enable confirms
rabbit_channel:do(Ch, #'confirm.select'{}),
receive
diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl
index 2de5819b54..5b6a6bd6ff 100644
--- a/test/vhost_SUITE.erl
+++ b/test/vhost_SUITE.erl
@@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) ->
Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
- #amqqueue{sync_slave_pids = [Pid]} = Q,
+ [Pid] = amqqueue:get_sync_slave_pids(Q),
Node1 = node(Pid),