diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/amqqueue_backward_compatibility_SUITE.erl | 302 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 27 | ||||
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 14 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 15 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 66 | ||||
| -rw-r--r-- | test/crashing_queues_SUITE.erl | 7 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 9 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 7 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 2 |
12 files changed, 401 insertions, 66 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl new file mode 100644 index 0000000000..05a049c9bb --- /dev/null +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -0,0 +1,302 @@ +-module(amqqueue_backward_compatibility_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include("amqqueue.hrl"). + +-export([all/0, + groups/0, + init_per_suite/2, + end_per_suite/2, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + new_amqqueue_v1_is_amqqueue/1, + new_amqqueue_v2_is_amqqueue/1, + random_term_is_not_amqqueue/1, + + amqqueue_v1_is_durable/1, + amqqueue_v2_is_durable/1, + random_term_is_not_durable/1, + + amqqueue_v1_state_matching/1, + amqqueue_v2_state_matching/1, + random_term_state_matching/1, + + amqqueue_v1_type_matching/1, + amqqueue_v2_type_matching/1, + random_term_type_matching/1, + + upgrade_v1_to_v2/1 + ]). + +-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, + n, o, p, q, r, s, t, u, v, w, x, y, z}). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + [ + {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, + new_amqqueue_v2_is_amqqueue, + random_term_is_not_amqqueue, + amqqueue_v1_is_durable, + amqqueue_v2_is_durable, + random_term_is_not_durable, + amqqueue_v1_state_matching, + amqqueue_v2_state_matching, + random_term_state_matching, + amqqueue_v1_type_matching, + amqqueue_v2_type_matching, + random_term_type_matching]} + ]. + +init_per_suite(_, Config) -> Config. +end_per_suite(_, Config) -> Config. + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(_, Config) -> Config. +end_per_testcase(_, Config) -> Config. + +new_amqqueue_v1_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v1(Queue)), + ?assert(not ?is_amqqueue_v2(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +new_amqqueue_v2_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), + Queue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v2(Queue)), + ?assert(not ?is_amqqueue_v1(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +random_term_is_not_amqqueue(_) -> + Term = ?long_tuple, + ?assert(not ?is_amqqueue(Term)), + ?assert(not ?is_amqqueue_v2(Term)), + ?assert(not ?is_amqqueue_v1(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + DurableQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +amqqueue_v2_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + DurableQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +random_term_is_not_durable(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_durable(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +amqqueue_v2_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +random_term_state_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_state_is(Term, live)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)). + +amqqueue_v2_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + ClassicQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_is_classic(ClassicQueue)), + ?assert(amqqueue:is_classic(ClassicQueue)), + ?assert(not ?amqqueue_is_quorum(ClassicQueue)), + ?assert(not amqqueue:is_quorum(ClassicQueue)), + QuorumQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + quorum), + ?assert(not ?amqqueue_is_classic(QuorumQueue)), + ?assert(not amqqueue:is_classic(QuorumQueue)), + ?assert(?amqqueue_is_quorum(QuorumQueue)), + ?assert(amqqueue:is_quorum(QuorumQueue)). + +random_term_type_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_classic(Term)), + ?assert(not ?amqqueue_is_quorum(Term)), + ?assertException(error, function_clause, amqqueue:is_classic(Term)), + ?assertException(error, function_clause, amqqueue:is_quorum(Term)). + +%% ------------------------------------------------------------------- + +upgrade_v1_to_v2(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + OldQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue_v1(OldQueue)), + ?assert(not ?is_amqqueue_v2(OldQueue)), + NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), + ?assert(not ?is_amqqueue_v1(NewQueue)), + ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 433bc66bff..bc9b2c8ced 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile(export_all). @@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) -> bq_variable_queue_delete_msg_store_files_callback1(Config) -> ok = restart_msg_store_empty(), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare( - queue_name(Config, - <<"bq_variable_queue_delete_msg_store_files_callback-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, publish_and_confirm(Q, Payload, Count), @@ -718,9 +718,10 @@ bq_queue_recover(Config) -> bq_queue_recover1(Config) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_queue_recover-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), @@ -736,7 +737,8 @@ bq_queue_recover1(Config) -> {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, - fun (Q1 = #amqqueue { pid = QPid1 }) -> + fun (Q1) when ?is_amqqueue(Q1) -> + QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, @@ -752,7 +754,9 @@ bq_queue_recover1(Config) -> passed. %% Return the PID of the given queue's supervisor. -get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) -> +get_queue_sup_pid(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), VHost = QName#resource.virtual_host, {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), Sups = supervisor:which_children(AmqSup), @@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> end, {VQ, []}, lists:seq(1, Count)). test_amqqueue(QName, Durable) -> - (rabbit_amqqueue:pseudo_queue(QName, self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(QName, self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 9bfa0ae07a..77da6133d8 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile([export_all]). @@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node), rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, consumers_all, [VHost]). -get_amqqueue(Q, []) -> throw({not_found, Q}); -get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ; -get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem). +get_amqqueue(QName0, []) -> + throw({not_found, QName0}); +get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) -> + QName1 = amqqueue:get_name(Q), + compare_amqqueue(QName0, QName1, Q, Rem). + +compare_amqqueue(QName, QName, Q, _Rem) -> + Q; +compare_amqqueue(QName, _, _, Rem) -> + get_amqqueue(QName, Rem). qconfig(Ch, Name, Ex, Consume, Deliver) -> [{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}]. diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index c52dc9ef64..dc30825f8c 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("include/amqqueue.hrl"). -compile(export_all). @@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) -> Self = self(), Pid = spawn(SecondaryNode, fun () -> - {new, #amqqueue{name = QueueName, pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], - none, <<"acting-user">>), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QueueName = ?amqqueue_field_name(Q), + QPid = ?amqqueue_field_pid(Q), exit(QPid, kill), Self ! {self(), killed, QPid} end), @@ -269,12 +270,12 @@ must_exit(Fun) -> end. dead_queue_loop(QueueName, OldPid) -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, - <<"acting-user">>), - case Q#amqqueue.pid of + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QPid = ?amqqueue_field_pid(Q), + case QPid of OldPid -> timer:sleep(25), dead_queue_loop(QueueName, OldPid); - _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + _ -> true = rabbit_misc:is_process_alive(QPid), Q end. diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 120257feb9..5ae2fb687c 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -315,14 +315,14 @@ forget_offline_removes_things(Config) -> forget_promotes_offline_slave(Config) -> [A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), - Q = <<"mirrored-queue">>, - declare(ACh, Q), - set_ha_policy(Config, Q, A, [B, C]), - set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves + QName = <<"mirrored-queue">>, + declare(ACh, QName), + set_ha_policy(Config, QName, A, [B, C]), + set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves %% Publish and confirm amqp_channel:call(ACh, #'confirm.select'{}), - amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q}, + amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}}), amqp_channel:wait_for_confirms(ACh), @@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, D), DCh2 = rabbit_ct_client_helpers:open_channel(Config, D), - #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q), + #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName), ok. -set_ha_policy(Config, Q, Master, Slaves) -> +set_ha_policy(Config, QName, Master, Slaves) -> Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]], - rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q, - {<<"nodes">>, Nodes}), - await_slaves(Q, Master, Slaves). - -await_slaves(Q, Master, Slaves) -> - {ok, #amqqueue{pid = MPid, - slave_pids = SPids}} = - rpc:call(Master, rabbit_amqqueue, lookup, - [rabbit_misc:r(<<"/">>, queue, Q)]), - ActMaster = node(MPid), + HaPolicy = {<<"nodes">>, Nodes}, + rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy), + await_slaves(QName, Master, Slaves). + +await_slaves(QName, Master, Slaves) -> + await_slaves_0(QName, Master, Slaves, 10). + +await_slaves_0(QName, Master, Slaves0, Tries) -> + {ok, Queue} = await_slaves_lookup_queue(QName, Master), + SPids = amqqueue:get_slave_pids(Queue), + ActMaster = amqqueue:qnode(Queue), ActSlaves = lists:usort([node(P) || P <- SPids]), - case {Master, lists:usort(Slaves)} of - {ActMaster, ActSlaves} -> ok; - _ -> timer:sleep(100), - await_slaves(Q, Master, Slaves) + Slaves1 = lists:usort(Slaves0), + await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries). + +await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) -> + error({timeout_waiting_for_slaves, QName}); +await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) -> + case {Master, Slaves} of + {ActMaster, ActSlaves} -> + ok; + _ -> + timer:sleep(250), + await_slaves_0(QName, Master, Slaves, Tries - 1) + end. + +await_slaves_lookup_queue(QName, Master) -> + await_slaves_lookup_queue(QName, Master, 10). + +await_slaves_lookup_queue(QName, _Master, 0) -> + error({timeout_looking_up_queue, QName}); +await_slaves_lookup_queue(QName, Master, Tries) -> + RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)], + case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of + {error, not_found} -> + timer:sleep(250), + await_slaves_lookup_queue(QName, Master, Tries - 1); + {ok, Q} -> + {ok, Q} end. force_boot(Config) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 2d91083abc..7b8ec91346 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -217,9 +217,10 @@ kill_queue(Node, QName) -> await_new_pid(Node, QName, Pid1). queue_pid(Node, QName) -> - #amqqueue{pid = QPid, - state = State, - name = #resource{virtual_host = VHost}} = lookup(Node, QName), + Q = lookup(Node, QName), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), case State of crashed -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 9db866e3c6..8a0ab98241 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"info_head_message_timestamp-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), %% The queue is empty: no timestamp. true = PQ:is_empty(BQS1), '' = PQ:info(head_message_timestamp, BQS1), @@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) -> ram_duration(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), {_Duration1, BQS2} = PQ:ram_duration(BQS1), BQS3 = PQ:set_ram_duration_target(infinity, BQS2), BQS4 = PQ:set_ram_duration_target(1, BQS3), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 9f40bc7b0c..0f9010f204 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -2228,11 +2228,10 @@ assert_queue_type(Server, Q, Expected) -> Actual = get_queue_type(Server, Q), Expected = Actual. -get_queue_type(Server, Q) -> - QNameRes = rabbit_misc:r(<<"/">>, queue, Q), - {ok, AMQQueue} = - rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), - AMQQueue#amqqueue.type. +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). wait_for_messages(Config, Stats) -> wait_for_messages(Config, lists:sort(Stats), 60). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 7ae43aa7e1..42ac863ead 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -364,11 +364,9 @@ cluster_queue_metrics(Config) -> % Synchronize Name = rabbit_misc:r(VHost, queue, QueueName), - [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, - ets, lookup, - [rabbit_queue, Name]), - ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, - sync_mirrors, [QPid]), + [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]), + QPid = amqqueue:get_pid(Q), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]), % Check ETS table for data wait_for(fun () -> diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index e3e282f233..d2db382e30 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -568,7 +568,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index d028ed3ccf..f4f4971517 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -283,8 +283,7 @@ wait_for_confirms(Unconfirmed) -> end. test_amqqueue(Durable) -> - (rabbit_amqqueue:pseudo_queue(test_queue(), self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of @@ -709,7 +708,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), @@ -958,7 +957,7 @@ confirms1(_Config) -> QName2 = DeclareBindDurableQueue(), %% Get the first one's pid (we'll crash it later) {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = Q1#amqqueue.pid, + QPid1 = amqqueue:get_pid(Q1), %% Enable confirms rabbit_channel:do(Ch, #'confirm.select'{}), receive diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 2de5819b54..5b6a6bd6ff 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - #amqqueue{sync_slave_pids = [Pid]} = Q, + [Pid] = amqqueue:get_sync_slave_pids(Q), Node1 = node(Pid), |
