diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2019-02-01 20:18:19 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2019-02-01 20:18:19 +0300 |
| commit | 160d70cc7840c3be60e0ae6786a182d67707140a (patch) | |
| tree | 02b2892828de63fce66699a0755e82835e4cd654 /test | |
| parent | abd9a2d997b66a38ba0f4febdfa778708feb5c22 (diff) | |
| parent | ed3dd4d6257df0925ce78ad94de099304346da4c (diff) | |
| download | rabbitmq-server-git-160d70cc7840c3be60e0ae6786a182d67707140a.tar.gz | |
Merge branch 'master' into dead-letter-testing
Conflicts:
src/rabbit_quorum_queue.erl
Diffstat (limited to 'test')
| -rw-r--r-- | test/amqqueue_backward_compatibility_SUITE.erl | 302 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 31 | ||||
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 14 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 15 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 66 | ||||
| -rw-r--r-- | test/crashing_queues_SUITE.erl | 7 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 23 | ||||
| -rw-r--r-- | test/feature_flags_SUITE.erl | 372 | ||||
| -rw-r--r-- | test/mirrored_supervisor_SUITE.erl | 13 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 76 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 22 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 29 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 2 |
17 files changed, 889 insertions, 105 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl new file mode 100644 index 0000000000..05a049c9bb --- /dev/null +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -0,0 +1,302 @@ +-module(amqqueue_backward_compatibility_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include("amqqueue.hrl"). + +-export([all/0, + groups/0, + init_per_suite/2, + end_per_suite/2, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + new_amqqueue_v1_is_amqqueue/1, + new_amqqueue_v2_is_amqqueue/1, + random_term_is_not_amqqueue/1, + + amqqueue_v1_is_durable/1, + amqqueue_v2_is_durable/1, + random_term_is_not_durable/1, + + amqqueue_v1_state_matching/1, + amqqueue_v2_state_matching/1, + random_term_state_matching/1, + + amqqueue_v1_type_matching/1, + amqqueue_v2_type_matching/1, + random_term_type_matching/1, + + upgrade_v1_to_v2/1 + ]). + +-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, + n, o, p, q, r, s, t, u, v, w, x, y, z}). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + [ + {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, + new_amqqueue_v2_is_amqqueue, + random_term_is_not_amqqueue, + amqqueue_v1_is_durable, + amqqueue_v2_is_durable, + random_term_is_not_durable, + amqqueue_v1_state_matching, + amqqueue_v2_state_matching, + random_term_state_matching, + amqqueue_v1_type_matching, + amqqueue_v2_type_matching, + random_term_type_matching]} + ]. + +init_per_suite(_, Config) -> Config. +end_per_suite(_, Config) -> Config. + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(_, Config) -> Config. +end_per_testcase(_, Config) -> Config. + +new_amqqueue_v1_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v1(Queue)), + ?assert(not ?is_amqqueue_v2(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +new_amqqueue_v2_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), + Queue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v2(Queue)), + ?assert(not ?is_amqqueue_v1(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +random_term_is_not_amqqueue(_) -> + Term = ?long_tuple, + ?assert(not ?is_amqqueue(Term)), + ?assert(not ?is_amqqueue_v2(Term)), + ?assert(not ?is_amqqueue_v1(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + DurableQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +amqqueue_v2_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + DurableQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +random_term_is_not_durable(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_durable(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +amqqueue_v2_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +random_term_state_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_state_is(Term, live)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)). + +amqqueue_v2_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + ClassicQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_is_classic(ClassicQueue)), + ?assert(amqqueue:is_classic(ClassicQueue)), + ?assert(not ?amqqueue_is_quorum(ClassicQueue)), + ?assert(not amqqueue:is_quorum(ClassicQueue)), + QuorumQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + quorum), + ?assert(not ?amqqueue_is_classic(QuorumQueue)), + ?assert(not amqqueue:is_classic(QuorumQueue)), + ?assert(?amqqueue_is_quorum(QuorumQueue)), + ?assert(amqqueue:is_quorum(QuorumQueue)). + +random_term_type_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_classic(Term)), + ?assert(not ?amqqueue_is_quorum(Term)), + ?assertException(error, function_clause, amqqueue:is_classic(Term)), + ?assertException(error, function_clause, amqqueue:is_quorum(Term)). + +%% ------------------------------------------------------------------- + +upgrade_v1_to_v2(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + OldQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue_v1(OldQueue)), + ?assert(not ?is_amqqueue_v2(OldQueue)), + NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), + ?assert(not ?is_amqqueue_v1(NewQueue)), + ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 433bc66bff..c3f87cce59 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile(export_all). @@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) -> bq_variable_queue_delete_msg_store_files_callback1(Config) -> ok = restart_msg_store_empty(), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare( - queue_name(Config, - <<"bq_variable_queue_delete_msg_store_files_callback-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, publish_and_confirm(Q, Payload, Count), @@ -718,9 +718,10 @@ bq_queue_recover(Config) -> bq_queue_recover1(Config) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_queue_recover-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), @@ -736,7 +737,8 @@ bq_queue_recover1(Config) -> {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, - fun (Q1 = #amqqueue { pid = QPid1 }) -> + fun (Q1) when ?is_amqqueue(Q1) -> + QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, @@ -752,7 +754,9 @@ bq_queue_recover1(Config) -> passed. %% Return the PID of the given queue's supervisor. -get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) -> +get_queue_sup_pid(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), VHost = QName#resource.virtual_host, {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), Sups = supervisor:which_children(AmqSup), @@ -1413,8 +1417,8 @@ with_fresh_variable_queue(Fun, Mode) -> shutdown, Fun(VQ1, QName)), Me ! Ref catch - Type:Error -> - Me ! {Ref, Type, Error, erlang:get_stacktrace()} + Type:Error:Stacktrace -> + Me ! {Ref, Type, Error, Stacktrace} end end), receive @@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> end, {VQ, []}, lists:seq(1, Count)). test_amqqueue(QName, Durable) -> - (rabbit_amqqueue:pseudo_queue(QName, self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(QName, self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 9bfa0ae07a..77da6133d8 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile([export_all]). @@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node), rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, consumers_all, [VHost]). -get_amqqueue(Q, []) -> throw({not_found, Q}); -get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ; -get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem). +get_amqqueue(QName0, []) -> + throw({not_found, QName0}); +get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) -> + QName1 = amqqueue:get_name(Q), + compare_amqqueue(QName0, QName1, Q, Rem). + +compare_amqqueue(QName, QName, Q, _Rem) -> + Q; +compare_amqqueue(QName, _, _, Rem) -> + get_amqqueue(QName, Rem). qconfig(Ch, Name, Ex, Consume, Deliver) -> [{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}]. diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index c52dc9ef64..dc30825f8c 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("include/amqqueue.hrl"). -compile(export_all). @@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) -> Self = self(), Pid = spawn(SecondaryNode, fun () -> - {new, #amqqueue{name = QueueName, pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], - none, <<"acting-user">>), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QueueName = ?amqqueue_field_name(Q), + QPid = ?amqqueue_field_pid(Q), exit(QPid, kill), Self ! {self(), killed, QPid} end), @@ -269,12 +270,12 @@ must_exit(Fun) -> end. dead_queue_loop(QueueName, OldPid) -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, - <<"acting-user">>), - case Q#amqqueue.pid of + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QPid = ?amqqueue_field_pid(Q), + case QPid of OldPid -> timer:sleep(25), dead_queue_loop(QueueName, OldPid); - _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + _ -> true = rabbit_misc:is_process_alive(QPid), Q end. diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 120257feb9..5ae2fb687c 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -315,14 +315,14 @@ forget_offline_removes_things(Config) -> forget_promotes_offline_slave(Config) -> [A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), - Q = <<"mirrored-queue">>, - declare(ACh, Q), - set_ha_policy(Config, Q, A, [B, C]), - set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves + QName = <<"mirrored-queue">>, + declare(ACh, QName), + set_ha_policy(Config, QName, A, [B, C]), + set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves %% Publish and confirm amqp_channel:call(ACh, #'confirm.select'{}), - amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q}, + amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}}), amqp_channel:wait_for_confirms(ACh), @@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, D), DCh2 = rabbit_ct_client_helpers:open_channel(Config, D), - #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q), + #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName), ok. -set_ha_policy(Config, Q, Master, Slaves) -> +set_ha_policy(Config, QName, Master, Slaves) -> Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]], - rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q, - {<<"nodes">>, Nodes}), - await_slaves(Q, Master, Slaves). - -await_slaves(Q, Master, Slaves) -> - {ok, #amqqueue{pid = MPid, - slave_pids = SPids}} = - rpc:call(Master, rabbit_amqqueue, lookup, - [rabbit_misc:r(<<"/">>, queue, Q)]), - ActMaster = node(MPid), + HaPolicy = {<<"nodes">>, Nodes}, + rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy), + await_slaves(QName, Master, Slaves). + +await_slaves(QName, Master, Slaves) -> + await_slaves_0(QName, Master, Slaves, 10). + +await_slaves_0(QName, Master, Slaves0, Tries) -> + {ok, Queue} = await_slaves_lookup_queue(QName, Master), + SPids = amqqueue:get_slave_pids(Queue), + ActMaster = amqqueue:qnode(Queue), ActSlaves = lists:usort([node(P) || P <- SPids]), - case {Master, lists:usort(Slaves)} of - {ActMaster, ActSlaves} -> ok; - _ -> timer:sleep(100), - await_slaves(Q, Master, Slaves) + Slaves1 = lists:usort(Slaves0), + await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries). + +await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) -> + error({timeout_waiting_for_slaves, QName}); +await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) -> + case {Master, Slaves} of + {ActMaster, ActSlaves} -> + ok; + _ -> + timer:sleep(250), + await_slaves_0(QName, Master, Slaves, Tries - 1) + end. + +await_slaves_lookup_queue(QName, Master) -> + await_slaves_lookup_queue(QName, Master, 10). + +await_slaves_lookup_queue(QName, _Master, 0) -> + error({timeout_looking_up_queue, QName}); +await_slaves_lookup_queue(QName, Master, Tries) -> + RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)], + case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of + {error, not_found} -> + timer:sleep(250), + await_slaves_lookup_queue(QName, Master, Tries - 1); + {ok, Q} -> + {ok, Q} end. force_boot(Config) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 2d91083abc..7b8ec91346 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -217,9 +217,10 @@ kill_queue(Node, QName) -> await_new_pid(Node, QName, Pid1). queue_pid(Node, QName) -> - #amqqueue{pid = QPid, - state = State, - name = #resource{virtual_host = VHost}} = lookup(Node, QName), + Q = lookup(Node, QName), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), case State of crashed -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index e41c07a888..6ccf3a75c3 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -614,8 +614,8 @@ get_stacktrace() -> try throw(e) catch - _:e -> - erlang:get_stacktrace() + _:e:Stacktrace -> + Stacktrace end. %%---------------------------------------------------------------------------- diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index fbc1e81827..89344af30c 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -83,9 +83,26 @@ init_per_testcase(Testcase, Config) -> {queue_name, Q}, {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config2 = rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config2, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config2, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + Config2; + false -> + end_per_testcase(Testcase, Config2), + {skip, "Quorum queues are unsupported"} + end. end_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:run_steps(Config, diff --git a/test/feature_flags_SUITE.erl b/test/feature_flags_SUITE.erl new file mode 100644 index 0000000000..db87442105 --- /dev/null +++ b/test/feature_flags_SUITE.erl @@ -0,0 +1,372 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(feature_flags_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + enable_quorum_queue_in_a_healthy_situation/1, + enable_unsupported_feature_flag_in_a_healthy_situation/1, + enable_quorum_queue_when_ff_file_is_unwritable/1, + enable_quorum_queue_with_a_network_partition/1, + mark_quorum_queue_as_enabled_with_a_network_partition/1 + ]). + +suite() -> + [{timetrap, 5 * 60000}]. + +all() -> + [ + {group, unclustered}, + {group, clustered} + ]. + +groups() -> + [ + {unclustered, [], + [ + enable_quorum_queue_in_a_healthy_situation, + enable_unsupported_feature_flag_in_a_healthy_situation, + enable_quorum_queue_when_ff_file_is_unwritable + ]}, + {clustered, [], + [ + enable_quorum_queue_in_a_healthy_situation, + enable_unsupported_feature_flag_in_a_healthy_situation, + enable_quorum_queue_when_ff_file_is_unwritable, + enable_quorum_queue_with_a_network_partition, + mark_quorum_queue_as_enabled_with_a_network_partition + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(clustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]); +init_per_group(unclustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]); +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_clustered, false}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, + {net_ticktime, 5} + ]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{forced_feature_flags_on_init, []}, + {log, [{file, [{level, debug}]}]}]}), + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]), + Ret = rabbit_ct_broker_helpers:rpc( + Config3, 0, rabbit_feature_flags, is_supported, [quorum_queue]), + case Ret of + true -> + Config3; + false -> + end_per_testcase(Testcase, Config3), + {skip, "Quorum queues are unsupported"} + end. + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +enable_quorum_queue_in_a_healthy_situation(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + + %% Re-enabling the feature flag also works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)). + +enable_unsupported_feature_flag_in_a_healthy_situation(Config) -> + FeatureName = unsupported_feature_flag, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is unsupported and thus disabled. + ?assertEqual( + False, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Enabling the feature flag works. + ?assertEqual( + {error, unsupported}, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)). + +enable_quorum_queue_when_ff_file_is_unwritable(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + Files = feature_flags_files(Config), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Restrict permissions on the `feature_flags` files. + [?assertEqual(ok, file:change_mode(File, 8#0444)) || File <- Files], + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + + %% The `feature_flags` file were not updated. + ?assertEqual( + lists:duplicate(ClusterSize, {ok, [[]]}), + [file:consult(File) || File <- feature_flags_files(Config)]), + + %% Stop all nodes and restore permissions on the `feature_flags` files. + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N)) + || N <- Nodes], + [?assertEqual(ok, file:change_mode(File, 8#0644)) || File <- Files], + + %% Restart all nodes and assert the feature flag is still enabled and + %% the `feature_flags` files were correctly repaired. + [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N)) + || N <- lists:reverse(Nodes)], + + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + ?assertEqual( + lists:duplicate(ClusterSize, {ok, [[FeatureName]]}), + [file:consult(File) || File <- feature_flags_files(Config)]). + +enable_quorum_queue_with_a_network_partition(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Isolate nodes B and E from the rest of the cluster. + NodePairs = [{B, A}, + {B, C}, + {B, D}, + {E, A}, + {E, C}, + {E, D}], + block(NodePairs), + timer:sleep(1000), + + %% Enabling the feature flag should fail in the specific case of + %% `quorum_queue`, if the network is broken. + ?assertEqual( + {error, unsupported}, + enable_feature_flag_on(Config, B, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Repair the network and try again to enable the feature flag. + unblock(NodePairs), + timer:sleep(1000), + [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N)) + || N <- [A, C, D]], + [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N)) + || N <- [A, C, D]], + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, B, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)). + +mark_quorum_queue_as_enabled_with_a_network_partition(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Isolate node B from the rest of the cluster. + NodePairs = [{B, A}, + {B, C}, + {B, D}, + {B, E}], + block(NodePairs), + timer:sleep(1000), + + %% Mark the feature flag as enabled on all nodes from node B. This + %% is expected to timeout. + RemoteNodes = [A, C, D, E], + ?assertEqual( + {failed_to_mark_feature_flag_as_enabled_on_remote_nodes, + FeatureName, + true, + RemoteNodes}, + rabbit_ct_broker_helpers:rpc( + Config, B, + rabbit_feature_flags, mark_as_enabled_remotely, + [RemoteNodes, FeatureName, true, 20000])), + + RepairFun = fun() -> + %% Wait a few seconds before we repair the network. + timer:sleep(5000), + + %% Repair the network and try again to enable + %% the feature flag. + unblock(NodePairs), + timer:sleep(1000) + end, + spawn(RepairFun), + + %% Mark the feature flag as enabled on all nodes from node B. This + %% is expected to work this time. + ct:pal(?LOW_IMPORTANCE, + "Marking the feature flag as enabled on remote nodes...", []), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:rpc( + Config, B, + rabbit_feature_flags, mark_as_enabled_remotely, + [RemoteNodes, FeatureName, true, 120000])). + +%% FIXME: Finish the testcase above ^ + +%% ------------------------------------------------------------------- +%% Internal helpers. +%% ------------------------------------------------------------------- + +enable_feature_flag_on(Config, Node, FeatureName) -> + rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_feature_flags, enable, [FeatureName]). + +is_feature_flag_supported(Config, FeatureName) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, is_supported, [FeatureName]). + +is_feature_flag_enabled(Config, FeatureName) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, is_enabled, [FeatureName]). + +feature_flags_files(Config) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, enabled_feature_flags_list_file, []). + +block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs]. +unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs]. + +block(X, Y) -> + rabbit_ct_broker_helpers:block_traffic_between(X, Y). + +allow(X, Y) -> + rabbit_ct_broker_helpers:allow_traffic_between(X, Y). diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl index d3cc080eeb..aa114e0a84 100644 --- a/test/mirrored_supervisor_SUITE.erl +++ b/test/mirrored_supervisor_SUITE.erl @@ -294,14 +294,17 @@ get_group(Group) -> call(Id, Msg) -> call(Id, Msg, 10*1000, 100). -call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); - call(Id, Msg, MaxDelay, Decr) -> + call(Id, Msg, MaxDelay, Decr, undefined). + +call(Id, Msg, 0, _Decr, Stacktrace) -> + exit({timeout_waiting_for_server, {Id, Msg}, Stacktrace}); + +call(Id, Msg, MaxDelay, Decr, _) -> try gen_server:call(Id, Msg, infinity) - catch exit:_ -> timer:sleep(Decr), - call(Id, Msg, MaxDelay - Decr, Decr) + catch exit:_:Stacktrace -> timer:sleep(Decr), + call(Id, Msg, MaxDelay - Decr, Decr, Stacktrace) end. kill(Pid) -> kill(Pid, []). diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 9db866e3c6..8a0ab98241 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"info_head_message_timestamp-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), %% The queue is empty: no timestamp. true = PQ:is_empty(BQS1), '' = PQ:info(head_message_timestamp, BQS1), @@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) -> ram_duration(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), {_Duration1, BQS2} = PQ:ram_duration(BQS1), BQS3 = PQ:set_ram_duration_target(infinity, BQS2), BQS4 = PQ:set_ram_duration_target(1, BQS3), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index b1f5cfff61..fd58756fda 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -29,6 +29,9 @@ -compile(export_all). +suite() -> + [{timetrap, 5 * 60000}]. + all() -> [ {group, single_node}, @@ -173,17 +176,32 @@ init_per_group(Group, Config) -> Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_queue_cleanup_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit more time - %% after clustering before running the tests. - case Group of - cluster_size_5 -> - timer:sleep(5000), - Config2; - _ -> - Config2 + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config2, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config2, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_queue_cleanup_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end; + false -> + end_per_group(Group, Config2), + {skip, "Quorum queues are unsupported"} end. end_per_group(clustered, Config) -> @@ -207,11 +225,28 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ {tcp_ports_base}, {queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ - [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, - fun rabbit_ct_broker_helpers:cluster_nodes/1]); + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]), + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config3, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config3, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config3, 0, rabbit_feature_flags, enable, [quorum_queue]), + Config3; + false -> + end_per_testcase(Testcase, Config3), + {skip, "Quorum queues are unsupported"} + end; init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -2194,11 +2229,10 @@ assert_queue_type(Server, Q, Expected) -> Actual = get_queue_type(Server, Q), Expected = Actual. -get_queue_type(Server, Q) -> - QNameRes = rabbit_misc:r(<<"/">>, queue, Q), - {ok, AMQQueue} = - rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), - AMQQueue#amqqueue.type. +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). publish_many(Ch, Queue, Count) -> [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 7ae43aa7e1..42ac863ead 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -364,11 +364,9 @@ cluster_queue_metrics(Config) -> % Synchronize Name = rabbit_misc:r(VHost, queue, QueueName), - [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, - ets, lookup, - [rabbit_queue, Name]), - ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, - sync_mirrors, [QPid]), + [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]), + QPid = amqqueue:get_pid(Q), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]), % Check ETS table for data wait_for(fun () -> diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 25261042b2..6071aeb5a5 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -66,13 +66,21 @@ init_per_group(classic_queue, Config) -> auto_delete = true} } | Config]; init_per_group(quorum_queue, Config) -> - [{single_active_consumer_queue_declare, - #'queue.declare'{arguments = [ - {<<"x-single-active-consumer">>, bool, true}, - {<<"x-queue-type">>, longstr, <<"quorum">>} - ], - durable = true, exclusive = false, auto_delete = false} - } | Config]. + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + case Ret of + ok -> + [{single_active_consumer_queue_declare, + #'queue.declare'{ + arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]; + Error -> + {skip, {"Quorum queues are unsupported", Error}} + end. end_per_group(_, Config) -> Config. diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index e3e282f233..d2db382e30 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -568,7 +568,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 581440d179..f4f4971517 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -99,10 +99,24 @@ init_per_group(max_length_classic, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(max_length_quorum, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + false -> + {skip, "Quorum queues are unsupported"} + end; init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), @@ -269,8 +283,7 @@ wait_for_confirms(Unconfirmed) -> end. test_amqqueue(Durable) -> - (rabbit_amqqueue:pseudo_queue(test_queue(), self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of @@ -695,7 +708,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), @@ -944,7 +957,7 @@ confirms1(_Config) -> QName2 = DeclareBindDurableQueue(), %% Get the first one's pid (we'll crash it later) {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = Q1#amqqueue.pid, + QPid1 = amqqueue:get_pid(Q1), %% Enable confirms rabbit_channel:do(Ch, #'confirm.select'{}), receive diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 2de5819b54..5b6a6bd6ff 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - #amqqueue{sync_slave_pids = [Pid]} = Q, + [Pid] = amqqueue:get_sync_slave_pids(Q), Node1 = node(Pid), |
