diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/amqqueue_backward_compatibility_SUITE.erl | 302 | ||||
| -rw-r--r-- | test/backing_queue_SUITE.erl | 31 | ||||
| -rw-r--r-- | test/channel_operation_timeout_SUITE.erl | 14 | ||||
| -rw-r--r-- | test/cluster_SUITE.erl | 15 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 66 | ||||
| -rw-r--r-- | test/crashing_queues_SUITE.erl | 7 | ||||
| -rw-r--r-- | test/dynamic_ha_SUITE.erl | 4 | ||||
| -rw-r--r-- | test/dynamic_qq_SUITE.erl | 23 | ||||
| -rw-r--r-- | test/feature_flags_SUITE.erl | 372 | ||||
| -rw-r--r-- | test/mirrored_supervisor_SUITE.erl | 13 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 106 | ||||
| -rw-r--r-- | test/quorum_queue_utils.erl | 29 | ||||
| -rw-r--r-- | test/rabbit_core_metrics_gc_SUITE.erl | 8 | ||||
| -rw-r--r-- | test/single_active_consumer_SUITE.erl | 22 | ||||
| -rw-r--r-- | test/unit_inbroker_dead_letter_SUITE.erl | 1149 | ||||
| -rw-r--r-- | test/unit_inbroker_non_parallel_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/unit_inbroker_parallel_SUITE.erl | 29 | ||||
| -rw-r--r-- | test/vhost_SUITE.erl | 2 |
19 files changed, 2068 insertions, 134 deletions
diff --git a/test/amqqueue_backward_compatibility_SUITE.erl b/test/amqqueue_backward_compatibility_SUITE.erl new file mode 100644 index 0000000000..05a049c9bb --- /dev/null +++ b/test/amqqueue_backward_compatibility_SUITE.erl @@ -0,0 +1,302 @@ +-module(amqqueue_backward_compatibility_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-include("amqqueue.hrl"). + +-export([all/0, + groups/0, + init_per_suite/2, + end_per_suite/2, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + new_amqqueue_v1_is_amqqueue/1, + new_amqqueue_v2_is_amqqueue/1, + random_term_is_not_amqqueue/1, + + amqqueue_v1_is_durable/1, + amqqueue_v2_is_durable/1, + random_term_is_not_durable/1, + + amqqueue_v1_state_matching/1, + amqqueue_v2_state_matching/1, + random_term_state_matching/1, + + amqqueue_v1_type_matching/1, + amqqueue_v2_type_matching/1, + random_term_type_matching/1, + + upgrade_v1_to_v2/1 + ]). + +-define(long_tuple, {random_tuple, a, b, c, d, e, f, g, h, i, j, k, l, m, + n, o, p, q, r, s, t, u, v, w, x, y, z}). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + [ + {parallel_tests, [parallel], [new_amqqueue_v1_is_amqqueue, + new_amqqueue_v2_is_amqqueue, + random_term_is_not_amqqueue, + amqqueue_v1_is_durable, + amqqueue_v2_is_durable, + random_term_is_not_durable, + amqqueue_v1_state_matching, + amqqueue_v2_state_matching, + random_term_state_matching, + amqqueue_v1_type_matching, + amqqueue_v2_type_matching, + random_term_type_matching]} + ]. + +init_per_suite(_, Config) -> Config. +end_per_suite(_, Config) -> Config. + +init_per_group(_, Config) -> Config. +end_per_group(_, Config) -> Config. + +init_per_testcase(_, Config) -> Config. +end_per_testcase(_, Config) -> Config. + +new_amqqueue_v1_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v1(Queue)), + ?assert(not ?is_amqqueue_v2(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +new_amqqueue_v2_is_amqqueue(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v2), + Queue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?is_amqqueue(Queue)), + ?assert(?is_amqqueue_v2(Queue)), + ?assert(not ?is_amqqueue_v1(Queue)), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)), + ?assert(not ?amqqueue_vhost_equals(Queue, <<"frazzle">>)), + ?assert(?amqqueue_has_valid_pid(Queue)), + ?assert(?amqqueue_pid_equals(Queue, self())), + ?assert(?amqqueue_pids_are_equal(Queue, Queue)), + ?assert(?amqqueue_pid_runs_on_local_node(Queue)), + ?assert(amqqueue:qnode(Queue) == node()). + +random_term_is_not_amqqueue(_) -> + Term = ?long_tuple, + ?assert(not ?is_amqqueue(Term)), + ?assert(not ?is_amqqueue_v2(Term)), + ?assert(not ?is_amqqueue_v1(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + DurableQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +amqqueue_v2_is_durable(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + TransientQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + false, + false, + none, + [], + VHost, + #{}, + classic), + DurableQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(not ?amqqueue_is_durable(TransientQueue)), + ?assert(?amqqueue_is_durable(DurableQueue)). + +random_term_is_not_durable(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_durable(Term)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +amqqueue_v2_state_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue1 = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_state_is(Queue1, live)), + Queue2 = amqqueue:set_state(Queue1, stopped), + ?assert(?amqqueue_state_is(Queue2, stopped)). + +random_term_state_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_state_is(Term, live)). + +%% ------------------------------------------------------------------- + +amqqueue_v1_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + Queue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?amqqueue_is_classic(Queue)), + ?assert(amqqueue:is_classic(Queue)), + ?assert(not ?amqqueue_is_quorum(Queue)). + +amqqueue_v2_type_matching(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + ClassicQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + classic), + ?assert(?amqqueue_is_classic(ClassicQueue)), + ?assert(amqqueue:is_classic(ClassicQueue)), + ?assert(not ?amqqueue_is_quorum(ClassicQueue)), + ?assert(not amqqueue:is_quorum(ClassicQueue)), + QuorumQueue = amqqueue:new_with_version(amqqueue_v2, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + quorum), + ?assert(not ?amqqueue_is_classic(QuorumQueue)), + ?assert(not amqqueue:is_classic(QuorumQueue)), + ?assert(?amqqueue_is_quorum(QuorumQueue)), + ?assert(amqqueue:is_quorum(QuorumQueue)). + +random_term_type_matching(_) -> + Term = ?long_tuple, + ?assert(not ?amqqueue_is_classic(Term)), + ?assert(not ?amqqueue_is_quorum(Term)), + ?assertException(error, function_clause, amqqueue:is_classic(Term)), + ?assertException(error, function_clause, amqqueue:is_quorum(Term)). + +%% ------------------------------------------------------------------- + +upgrade_v1_to_v2(_) -> + VHost = <<"/">>, + Name = rabbit_misc:r(VHost, queue, my_amqqueue_v1), + OldQueue = amqqueue:new_with_version(amqqueue_v1, + Name, + self(), + true, + false, + none, + [], + VHost, + #{}, + ?amqqueue_v1_type), + ?assert(?is_amqqueue_v1(OldQueue)), + ?assert(not ?is_amqqueue_v2(OldQueue)), + NewQueue = amqqueue:upgrade_to(amqqueue_v2, OldQueue), + ?assert(not ?is_amqqueue_v1(NewQueue)), + ?assert(?is_amqqueue_v2(NewQueue)). diff --git a/test/backing_queue_SUITE.erl b/test/backing_queue_SUITE.erl index 433bc66bff..c3f87cce59 100644 --- a/test/backing_queue_SUITE.erl +++ b/test/backing_queue_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile(export_all). @@ -686,11 +687,10 @@ bq_variable_queue_delete_msg_store_files_callback(Config) -> bq_variable_queue_delete_msg_store_files_callback1(Config) -> ok = restart_msg_store_empty(), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare( - queue_name(Config, - <<"bq_variable_queue_delete_msg_store_files_callback-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_variable_queue_delete_msg_store_files_callback-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), Payload = <<0:8388608>>, %% 1MB Count = 30, publish_and_confirm(Q, Payload, Count), @@ -718,9 +718,10 @@ bq_queue_recover(Config) -> bq_queue_recover1(Config) -> Count = 2 * rabbit_queue_index:next_segment_boundary(0), - {new, #amqqueue { pid = QPid, name = QName } = Q} = - rabbit_amqqueue:declare(queue_name(Config, <<"bq_queue_recover-q">>), - true, false, [], none, <<"acting-user">>), + QName0 = queue_name(Config, <<"bq_queue_recover-q">>), + {new, Q} = rabbit_amqqueue:declare(QName0, true, false, [], none, <<"acting-user">>), + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), publish_and_confirm(Q, <<>>, Count), SupPid = get_queue_sup_pid(Q), @@ -736,7 +737,8 @@ bq_queue_recover1(Config) -> {ok, Limiter} = rabbit_limiter:start_link(no_id), rabbit_amqqueue:with_or_die( QName, - fun (Q1 = #amqqueue { pid = QPid1 }) -> + fun (Q1) when ?is_amqqueue(Q1) -> + QPid1 = amqqueue:get_pid(Q1), CountMinusOne = Count - 1, {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false, Limiter, @@ -752,7 +754,9 @@ bq_queue_recover1(Config) -> passed. %% Return the PID of the given queue's supervisor. -get_queue_sup_pid(#amqqueue { pid = QPid, name = QName }) -> +get_queue_sup_pid(Q) when ?is_amqqueue(Q) -> + QName = amqqueue:get_name(Q), + QPid = amqqueue:get_pid(Q), VHost = QName#resource.virtual_host, {ok, AmqSup} = rabbit_amqqueue_sup_sup:find_for_vhost(VHost, node(QPid)), Sups = supervisor:which_children(AmqSup), @@ -1413,8 +1417,8 @@ with_fresh_variable_queue(Fun, Mode) -> shutdown, Fun(VQ1, QName)), Me ! Ref catch - Type:Error -> - Me ! {Ref, Type, Error, erlang:get_stacktrace()} + Type:Error:Stacktrace -> + Me ! {Ref, Type, Error, Stacktrace} end end), receive @@ -1498,8 +1502,7 @@ variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> end, {VQ, []}, lists:seq(1, Count)). test_amqqueue(QName, Durable) -> - (rabbit_amqqueue:pseudo_queue(QName, self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(QName, self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 9bfa0ae07a..77da6133d8 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("amqqueue.hrl"). -compile([export_all]). @@ -169,9 +170,16 @@ get_consumers(Config, Node, VHost) when is_atom(Node), rabbit_ct_broker_helpers:rpc(Config, Node, rabbit_amqqueue, consumers_all, [VHost]). -get_amqqueue(Q, []) -> throw({not_found, Q}); -get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ; -get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem). +get_amqqueue(QName0, []) -> + throw({not_found, QName0}); +get_amqqueue(QName0, [Q | Rem]) when ?is_amqqueue(Q) -> + QName1 = amqqueue:get_name(Q), + compare_amqqueue(QName0, QName1, Q, Rem). + +compare_amqqueue(QName, QName, Q, _Rem) -> + Q; +compare_amqqueue(QName, _, _, Rem) -> + get_amqqueue(QName, Rem). qconfig(Ch, Name, Ex, Consume, Deliver) -> [{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}]. diff --git a/test/cluster_SUITE.erl b/test/cluster_SUITE.erl index c52dc9ef64..dc30825f8c 100644 --- a/test/cluster_SUITE.erl +++ b/test/cluster_SUITE.erl @@ -18,6 +18,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). +-include("include/amqqueue.hrl"). -compile(export_all). @@ -225,9 +226,9 @@ declare_on_dead_queue1(_Config, SecondaryNode) -> Self = self(), Pid = spawn(SecondaryNode, fun () -> - {new, #amqqueue{name = QueueName, pid = QPid}} = - rabbit_amqqueue:declare(QueueName, false, false, [], - none, <<"acting-user">>), + {new, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QueueName = ?amqqueue_field_name(Q), + QPid = ?amqqueue_field_pid(Q), exit(QPid, kill), Self ! {self(), killed, QPid} end), @@ -269,12 +270,12 @@ must_exit(Fun) -> end. dead_queue_loop(QueueName, OldPid) -> - {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, - <<"acting-user">>), - case Q#amqqueue.pid of + {existing, Q} = rabbit_amqqueue:declare(QueueName, false, false, [], none, <<"acting-user">>), + QPid = ?amqqueue_field_pid(Q), + case QPid of OldPid -> timer:sleep(25), dead_queue_loop(QueueName, OldPid); - _ -> true = rabbit_misc:is_process_alive(Q#amqqueue.pid), + _ -> true = rabbit_misc:is_process_alive(QPid), Q end. diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 120257feb9..5ae2fb687c 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -315,14 +315,14 @@ forget_offline_removes_things(Config) -> forget_promotes_offline_slave(Config) -> [A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), ACh = rabbit_ct_client_helpers:open_channel(Config, A), - Q = <<"mirrored-queue">>, - declare(ACh, Q), - set_ha_policy(Config, Q, A, [B, C]), - set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves + QName = <<"mirrored-queue">>, + declare(ACh, QName), + set_ha_policy(Config, QName, A, [B, C]), + set_ha_policy(Config, QName, A, [C, D]), %% Test add and remove from recoverable_slaves %% Publish and confirm amqp_channel:call(ACh, #'confirm.select'{}), - amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q}, + amqp_channel:cast(ACh, #'basic.publish'{routing_key = QName}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}}), amqp_channel:wait_for_confirms(ACh), @@ -353,26 +353,50 @@ forget_promotes_offline_slave(Config) -> ok = rabbit_ct_broker_helpers:start_node(Config, D), DCh2 = rabbit_ct_client_helpers:open_channel(Config, D), - #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q), + #'queue.declare_ok'{message_count = 1} = declare(DCh2, QName), ok. -set_ha_policy(Config, Q, Master, Slaves) -> +set_ha_policy(Config, QName, Master, Slaves) -> Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]], - rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q, - {<<"nodes">>, Nodes}), - await_slaves(Q, Master, Slaves). - -await_slaves(Q, Master, Slaves) -> - {ok, #amqqueue{pid = MPid, - slave_pids = SPids}} = - rpc:call(Master, rabbit_amqqueue, lookup, - [rabbit_misc:r(<<"/">>, queue, Q)]), - ActMaster = node(MPid), + HaPolicy = {<<"nodes">>, Nodes}, + rabbit_ct_broker_helpers:set_ha_policy(Config, Master, QName, HaPolicy), + await_slaves(QName, Master, Slaves). + +await_slaves(QName, Master, Slaves) -> + await_slaves_0(QName, Master, Slaves, 10). + +await_slaves_0(QName, Master, Slaves0, Tries) -> + {ok, Queue} = await_slaves_lookup_queue(QName, Master), + SPids = amqqueue:get_slave_pids(Queue), + ActMaster = amqqueue:qnode(Queue), ActSlaves = lists:usort([node(P) || P <- SPids]), - case {Master, lists:usort(Slaves)} of - {ActMaster, ActSlaves} -> ok; - _ -> timer:sleep(100), - await_slaves(Q, Master, Slaves) + Slaves1 = lists:usort(Slaves0), + await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves1, Tries). + +await_slaves_1(QName, _ActMaster, _ActSlaves, _Master, _Slaves, 0) -> + error({timeout_waiting_for_slaves, QName}); +await_slaves_1(QName, ActMaster, ActSlaves, Master, Slaves, Tries) -> + case {Master, Slaves} of + {ActMaster, ActSlaves} -> + ok; + _ -> + timer:sleep(250), + await_slaves_0(QName, Master, Slaves, Tries - 1) + end. + +await_slaves_lookup_queue(QName, Master) -> + await_slaves_lookup_queue(QName, Master, 10). + +await_slaves_lookup_queue(QName, _Master, 0) -> + error({timeout_looking_up_queue, QName}); +await_slaves_lookup_queue(QName, Master, Tries) -> + RpcArgs = [rabbit_misc:r(<<"/">>, queue, QName)], + case rpc:call(Master, rabbit_amqqueue, lookup, RpcArgs) of + {error, not_found} -> + timer:sleep(250), + await_slaves_lookup_queue(QName, Master, Tries - 1); + {ok, Q} -> + {ok, Q} end. force_boot(Config) -> diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl index 2d91083abc..7b8ec91346 100644 --- a/test/crashing_queues_SUITE.erl +++ b/test/crashing_queues_SUITE.erl @@ -217,9 +217,10 @@ kill_queue(Node, QName) -> await_new_pid(Node, QName, Pid1). queue_pid(Node, QName) -> - #amqqueue{pid = QPid, - state = State, - name = #resource{virtual_host = VHost}} = lookup(Node, QName), + Q = lookup(Node, QName), + QPid = amqqueue:get_pid(Q), + State = amqqueue:get_state(Q), + #resource{virtual_host = VHost} = amqqueue:get_name(Q), case State of crashed -> case rabbit_amqqueue_sup_sup:find_for_vhost(VHost, Node) of diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index e41c07a888..6ccf3a75c3 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -614,8 +614,8 @@ get_stacktrace() -> try throw(e) catch - _:e -> - erlang:get_stacktrace() + _:e:Stacktrace -> + Stacktrace end. %%---------------------------------------------------------------------------- diff --git a/test/dynamic_qq_SUITE.erl b/test/dynamic_qq_SUITE.erl index fbc1e81827..89344af30c 100644 --- a/test/dynamic_qq_SUITE.erl +++ b/test/dynamic_qq_SUITE.erl @@ -83,9 +83,26 @@ init_per_testcase(Testcase, Config) -> {queue_name, Q}, {queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]} ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). + Config2 = rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config2, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config2, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + Config2; + false -> + end_per_testcase(Testcase, Config2), + {skip, "Quorum queues are unsupported"} + end. end_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:run_steps(Config, diff --git a/test/feature_flags_SUITE.erl b/test/feature_flags_SUITE.erl new file mode 100644 index 0000000000..db87442105 --- /dev/null +++ b/test/feature_flags_SUITE.erl @@ -0,0 +1,372 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018-2019 Pivotal Software, Inc. All rights reserved. +%% + +-module(feature_flags_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-export([suite/0, + all/0, + groups/0, + init_per_suite/1, + end_per_suite/1, + init_per_group/2, + end_per_group/2, + init_per_testcase/2, + end_per_testcase/2, + + enable_quorum_queue_in_a_healthy_situation/1, + enable_unsupported_feature_flag_in_a_healthy_situation/1, + enable_quorum_queue_when_ff_file_is_unwritable/1, + enable_quorum_queue_with_a_network_partition/1, + mark_quorum_queue_as_enabled_with_a_network_partition/1 + ]). + +suite() -> + [{timetrap, 5 * 60000}]. + +all() -> + [ + {group, unclustered}, + {group, clustered} + ]. + +groups() -> + [ + {unclustered, [], + [ + enable_quorum_queue_in_a_healthy_situation, + enable_unsupported_feature_flag_in_a_healthy_situation, + enable_quorum_queue_when_ff_file_is_unwritable + ]}, + {clustered, [], + [ + enable_quorum_queue_in_a_healthy_situation, + enable_unsupported_feature_flag_in_a_healthy_situation, + enable_quorum_queue_when_ff_file_is_unwritable, + enable_quorum_queue_with_a_network_partition, + mark_quorum_queue_as_enabled_with_a_network_partition + ]} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config, [ + fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1 + ]). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(clustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]); +init_per_group(unclustered, Config) -> + rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 1}]); +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase), + TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase), + ClusterSize = ?config(rmq_nodes_count, Config), + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodes_clustered, false}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}, + {net_ticktime, 5} + ]), + Config2 = rabbit_ct_helpers:merge_app_env( + Config1, + {rabbit, + [{forced_feature_flags_on_init, []}, + {log, [{file, [{level, debug}]}]}]}), + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]), + Ret = rabbit_ct_broker_helpers:rpc( + Config3, 0, rabbit_feature_flags, is_supported, [quorum_queue]), + case Ret of + true -> + Config3; + false -> + end_per_testcase(Testcase, Config3), + {skip, "Quorum queues are unsupported"} + end. + +end_per_testcase(Testcase, Config) -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase). + +%% ------------------------------------------------------------------- +%% Testcases. +%% ------------------------------------------------------------------- + +enable_quorum_queue_in_a_healthy_situation(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + + %% Re-enabling the feature flag also works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)). + +enable_unsupported_feature_flag_in_a_healthy_situation(Config) -> + FeatureName = unsupported_feature_flag, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is unsupported and thus disabled. + ?assertEqual( + False, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Enabling the feature flag works. + ?assertEqual( + {error, unsupported}, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)). + +enable_quorum_queue_when_ff_file_is_unwritable(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + Node = ClusterSize - 1, + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + Files = feature_flags_files(Config), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Restrict permissions on the `feature_flags` files. + [?assertEqual(ok, file:change_mode(File, 8#0444)) || File <- Files], + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, Node, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + + %% The `feature_flags` file were not updated. + ?assertEqual( + lists:duplicate(ClusterSize, {ok, [[]]}), + [file:consult(File) || File <- feature_flags_files(Config)]), + + %% Stop all nodes and restore permissions on the `feature_flags` files. + Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N)) + || N <- Nodes], + [?assertEqual(ok, file:change_mode(File, 8#0644)) || File <- Files], + + %% Restart all nodes and assert the feature flag is still enabled and + %% the `feature_flags` files were correctly repaired. + [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N)) + || N <- lists:reverse(Nodes)], + + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)), + ?assertEqual( + lists:duplicate(ClusterSize, {ok, [[FeatureName]]}), + [file:consult(File) || File <- feature_flags_files(Config)]). + +enable_quorum_queue_with_a_network_partition(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Isolate nodes B and E from the rest of the cluster. + NodePairs = [{B, A}, + {B, C}, + {B, D}, + {E, A}, + {E, C}, + {E, D}], + block(NodePairs), + timer:sleep(1000), + + %% Enabling the feature flag should fail in the specific case of + %% `quorum_queue`, if the network is broken. + ?assertEqual( + {error, unsupported}, + enable_feature_flag_on(Config, B, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Repair the network and try again to enable the feature flag. + unblock(NodePairs), + timer:sleep(1000), + [?assertEqual(ok, rabbit_ct_broker_helpers:stop_node(Config, N)) + || N <- [A, C, D]], + [?assertEqual(ok, rabbit_ct_broker_helpers:start_node(Config, N)) + || N <- [A, C, D]], + + %% Enabling the feature flag works. + ?assertEqual( + ok, + enable_feature_flag_on(Config, B, FeatureName)), + ?assertEqual( + True, + is_feature_flag_enabled(Config, FeatureName)). + +mark_quorum_queue_as_enabled_with_a_network_partition(Config) -> + FeatureName = quorum_queue, + ClusterSize = ?config(rmq_nodes_count, Config), + [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + True = lists:duplicate(ClusterSize, true), + False = lists:duplicate(ClusterSize, false), + + %% The feature flag is supported but disabled initially. + ?assertEqual( + True, + is_feature_flag_supported(Config, FeatureName)), + ?assertEqual( + False, + is_feature_flag_enabled(Config, FeatureName)), + + %% Isolate node B from the rest of the cluster. + NodePairs = [{B, A}, + {B, C}, + {B, D}, + {B, E}], + block(NodePairs), + timer:sleep(1000), + + %% Mark the feature flag as enabled on all nodes from node B. This + %% is expected to timeout. + RemoteNodes = [A, C, D, E], + ?assertEqual( + {failed_to_mark_feature_flag_as_enabled_on_remote_nodes, + FeatureName, + true, + RemoteNodes}, + rabbit_ct_broker_helpers:rpc( + Config, B, + rabbit_feature_flags, mark_as_enabled_remotely, + [RemoteNodes, FeatureName, true, 20000])), + + RepairFun = fun() -> + %% Wait a few seconds before we repair the network. + timer:sleep(5000), + + %% Repair the network and try again to enable + %% the feature flag. + unblock(NodePairs), + timer:sleep(1000) + end, + spawn(RepairFun), + + %% Mark the feature flag as enabled on all nodes from node B. This + %% is expected to work this time. + ct:pal(?LOW_IMPORTANCE, + "Marking the feature flag as enabled on remote nodes...", []), + ?assertEqual( + ok, + rabbit_ct_broker_helpers:rpc( + Config, B, + rabbit_feature_flags, mark_as_enabled_remotely, + [RemoteNodes, FeatureName, true, 120000])). + +%% FIXME: Finish the testcase above ^ + +%% ------------------------------------------------------------------- +%% Internal helpers. +%% ------------------------------------------------------------------- + +enable_feature_flag_on(Config, Node, FeatureName) -> + rabbit_ct_broker_helpers:rpc( + Config, Node, rabbit_feature_flags, enable, [FeatureName]). + +is_feature_flag_supported(Config, FeatureName) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, is_supported, [FeatureName]). + +is_feature_flag_enabled(Config, FeatureName) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, is_enabled, [FeatureName]). + +feature_flags_files(Config) -> + rabbit_ct_broker_helpers:rpc_all( + Config, rabbit_feature_flags, enabled_feature_flags_list_file, []). + +block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs]. +unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs]. + +block(X, Y) -> + rabbit_ct_broker_helpers:block_traffic_between(X, Y). + +allow(X, Y) -> + rabbit_ct_broker_helpers:allow_traffic_between(X, Y). diff --git a/test/mirrored_supervisor_SUITE.erl b/test/mirrored_supervisor_SUITE.erl index d3cc080eeb..aa114e0a84 100644 --- a/test/mirrored_supervisor_SUITE.erl +++ b/test/mirrored_supervisor_SUITE.erl @@ -294,14 +294,17 @@ get_group(Group) -> call(Id, Msg) -> call(Id, Msg, 10*1000, 100). -call(Id, Msg, 0, _Decr) -> - exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); - call(Id, Msg, MaxDelay, Decr) -> + call(Id, Msg, MaxDelay, Decr, undefined). + +call(Id, Msg, 0, _Decr, Stacktrace) -> + exit({timeout_waiting_for_server, {Id, Msg}, Stacktrace}); + +call(Id, Msg, MaxDelay, Decr, _) -> try gen_server:call(Id, Msg, infinity) - catch exit:_ -> timer:sleep(Decr), - call(Id, Msg, MaxDelay - Decr, Decr) + catch exit:_:Stacktrace -> timer:sleep(Decr), + call(Id, Msg, MaxDelay - Decr, Decr, Stacktrace) end. kill(Pid) -> kill(Pid, []). diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 9db866e3c6..8a0ab98241 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -366,9 +366,9 @@ info_head_message_timestamp1(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"info_head_message_timestamp-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 2}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), %% The queue is empty: no timestamp. true = PQ:is_empty(BQS1), '' = PQ:info(head_message_timestamp, BQS1), @@ -415,9 +415,9 @@ info_head_message_timestamp1(_Config) -> ram_duration(_Config) -> QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>), Q0 = rabbit_amqqueue:pseudo_queue(QName, self()), - Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]}, + Q1 = amqqueue:set_arguments(Q0, [{<<"x-max-priority">>, long, 5}]), PQ = rabbit_priority_queue, - BQS1 = PQ:init(Q, new, fun(_, _) -> ok end), + BQS1 = PQ:init(Q1, new, fun(_, _) -> ok end), {_Duration1, BQS2} = PQ:ram_duration(BQS1), BQS3 = PQ:set_ram_duration_target(infinity, BQS2), BQS4 = PQ:set_ram_duration_target(1, BQS3), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 48dac3ca57..fd58756fda 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -23,11 +23,15 @@ -import(quorum_queue_utils, [wait_for_messages_ready/3, wait_for_messages_pending_ack/3, wait_for_messages_total/3, + wait_for_messages/2, dirty_query/3, ra_name/1]). -compile(export_all). +suite() -> + [{timetrap, 5 * 60000}]. + all() -> [ {group, single_node}, @@ -172,17 +176,32 @@ init_per_group(Group, Config) -> Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), - ok = rabbit_ct_broker_helpers:rpc( - Config2, 0, application, set_env, - [rabbit, channel_queue_cleanup_interval, 100]), - %% HACK: the larger cluster sizes benefit for a bit more time - %% after clustering before running the tests. - case Group of - cluster_size_5 -> - timer:sleep(5000), - Config2; - _ -> - Config2 + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config2, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config2, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, rabbit_feature_flags, enable, [quorum_queue]), + ok = rabbit_ct_broker_helpers:rpc( + Config2, 0, application, set_env, + [rabbit, channel_queue_cleanup_interval, 100]), + %% HACK: the larger cluster sizes benefit for a bit more time + %% after clustering before running the tests. + case Group of + cluster_size_5 -> + timer:sleep(5000), + Config2; + _ -> + Config2 + end; + false -> + end_per_group(Group, Config2), + {skip, "Quorum queues are unsupported"} end. end_per_group(clustered, Config) -> @@ -206,11 +225,28 @@ init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publ {tcp_ports_base}, {queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps() ++ - [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, - fun rabbit_ct_broker_helpers:cluster_nodes/1]); + Config3 = rabbit_ct_helpers:run_steps( + Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]), + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config3, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config3, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config3, 0, rabbit_feature_flags, enable, [quorum_queue]), + Config3; + false -> + end_per_testcase(Testcase, Config3), + {skip, "Quorum queues are unsupported"} + end; init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -2193,40 +2229,10 @@ assert_queue_type(Server, Q, Expected) -> Actual = get_queue_type(Server, Q), Expected = Actual. -get_queue_type(Server, Q) -> - QNameRes = rabbit_misc:r(<<"/">>, queue, Q), - {ok, AMQQueue} = - rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), - AMQQueue#amqqueue.type. - -wait_for_messages(Config, Stats) -> - wait_for_messages(Config, lists:sort(Stats), 60). - -wait_for_messages(Config, Stats, 0) -> - ?assertEqual(Stats, - lists:sort( - filter_queues(Stats, - rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, ["list_queues", "name", "messages", "messages_ready", - "messages_unacknowledged"])))); -wait_for_messages(Config, Stats, N) -> - case lists:sort( - filter_queues(Stats, - rabbit_ct_broker_helpers:rabbitmqctl_list( - Config, 0, ["list_queues", "name", "messages", "messages_ready", - "messages_unacknowledged"]))) of - Stats0 when Stats0 == Stats -> - ok; - _ -> - timer:sleep(500), - wait_for_messages(Config, Stats, N - 1) - end. - -filter_queues(Expected, Got) -> - Keys = [K || [K, _, _, _] <- Expected], - lists:filter(fun([K, _, _, _]) -> - lists:member(K, Keys) - end, Got). +get_queue_type(Server, Q0) -> + QNameRes = rabbit_misc:r(<<"/">>, queue, Q0), + {ok, Q1} = rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]), + amqqueue:get_type(Q1). publish_many(Ch, Queue, Count) -> [publish(Ch, Queue) || _ <- lists:seq(1, Count)]. diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl index 6b820c7b5c..9c988bc066 100644 --- a/test/quorum_queue_utils.erl +++ b/test/quorum_queue_utils.erl @@ -6,6 +6,7 @@ wait_for_messages_ready/3, wait_for_messages_pending_ack/3, wait_for_messages_total/3, + wait_for_messages/2, dirty_query/3, ra_name/1 ]). @@ -45,6 +46,29 @@ wait_for_messages(Servers, QName, Number, Fun, N) -> wait_for_messages(Servers, QName, Number, Fun, N - 1) end. +wait_for_messages(Config, Stats) -> + wait_for_messages(Config, lists:sort(Stats), 10). + +wait_for_messages(Config, Stats, 0) -> + ?assertEqual(Stats, + lists:sort( + filter_queues(Stats, + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages", "messages_ready", + "messages_unacknowledged"])))); +wait_for_messages(Config, Stats, N) -> + case lists:sort( + filter_queues(Stats, + rabbit_ct_broker_helpers:rabbitmqctl_list( + Config, 0, ["list_queues", "name", "messages", "messages_ready", + "messages_unacknowledged"]))) of + Stats0 when Stats0 == Stats -> + ok; + _ -> + timer:sleep(500), + wait_for_messages(Config, Stats, N - 1) + end. + dirty_query(Servers, QName, Fun) -> lists:map( fun(N) -> @@ -59,3 +83,8 @@ dirty_query(Servers, QName, Fun) -> ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). +filter_queues(Expected, Got) -> + Keys = [K || [K, _, _, _] <- Expected], + lists:filter(fun([K, _, _, _]) -> + lists:member(K, Keys) + end, Got). diff --git a/test/rabbit_core_metrics_gc_SUITE.erl b/test/rabbit_core_metrics_gc_SUITE.erl index 7ae43aa7e1..42ac863ead 100644 --- a/test/rabbit_core_metrics_gc_SUITE.erl +++ b/test/rabbit_core_metrics_gc_SUITE.erl @@ -364,11 +364,9 @@ cluster_queue_metrics(Config) -> % Synchronize Name = rabbit_misc:r(VHost, queue, QueueName), - [#amqqueue{pid = QPid}] = rabbit_ct_broker_helpers:rpc(Config, Node0, - ets, lookup, - [rabbit_queue, Name]), - ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, - sync_mirrors, [QPid]), + [Q] = rabbit_ct_broker_helpers:rpc(Config, Node0, ets, lookup, [rabbit_queue, Name]), + QPid = amqqueue:get_pid(Q), + ok = rabbit_ct_broker_helpers:rpc(Config, Node0, rabbit_amqqueue, sync_mirrors, [QPid]), % Check ETS table for data wait_for(fun () -> diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 25261042b2..6071aeb5a5 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -66,13 +66,21 @@ init_per_group(classic_queue, Config) -> auto_delete = true} } | Config]; init_per_group(quorum_queue, Config) -> - [{single_active_consumer_queue_declare, - #'queue.declare'{arguments = [ - {<<"x-single-active-consumer">>, bool, true}, - {<<"x-queue-type">>, longstr, <<"quorum">>} - ], - durable = true, exclusive = false, auto_delete = false} - } | Config]. + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + case Ret of + ok -> + [{single_active_consumer_queue_declare, + #'queue.declare'{ + arguments = [ + {<<"x-single-active-consumer">>, bool, true}, + {<<"x-queue-type">>, longstr, <<"quorum">>} + ], + durable = true, exclusive = false, auto_delete = false} + } | Config]; + Error -> + {skip, {"Quorum queues are unsupported", Error}} + end. end_per_group(_, Config) -> Config. diff --git a/test/unit_inbroker_dead_letter_SUITE.erl b/test/unit_inbroker_dead_letter_SUITE.erl new file mode 100644 index 0000000000..900a129d3b --- /dev/null +++ b/test/unit_inbroker_dead_letter_SUITE.erl @@ -0,0 +1,1149 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +%% For the full spec see: http://www.rabbitmq.com/dlx.html +%% +-module(unit_inbroker_dead_letter_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, dead_letter_tests} + ]. + +groups() -> + DeadLetterTests = [dead_letter_nack, + dead_letter_multiple_nack, + dead_letter_nack_requeue, + dead_letter_nack_requeue_multiple, + dead_letter_reject, + dead_letter_reject_requeue, + dead_letter_max_length_drop_head, + dead_letter_missing_exchange, + dead_letter_routing_key, + dead_letter_routing_key_header_CC, + dead_letter_routing_key_header_BCC, + dead_letter_routing_key_cycle_max_length, + dead_letter_routing_key_cycle_with_reject, + dead_letter_policy, + dead_letter_override_policy, + dead_letter_ignore_policy, + dead_letter_headers, + dead_letter_headers_reason_maxlen, + dead_letter_headers_cycle, + dead_letter_headers_BCC, + dead_letter_headers_CC, + dead_letter_headers_CC_with_routing_key, + dead_letter_headers_first_death], + [ + {dead_letter_tests, [], + [ + {classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, + dead_letter_routing_key_cycle_ttl, + dead_letter_headers_reason_expired, + dead_letter_headers_reason_expired_per_message]}, + {mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, + dead_letter_routing_key_cycle_ttl, + dead_letter_headers_reason_expired, + dead_letter_headers_reason_expired_per_message]}, + {quorum_queue, [parallel], DeadLetterTests} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, false}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])), + DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange", + [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange}, + {queue_name, Q}, + {queue_name_dlx, Q2}, + {policy, Policy}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}), + amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}), + _ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Dead letter exchanges +%% +%% Messages are dead-lettered when: +%% 1) message is rejected with basic.reject or basic.nack with requeue=false +%% 2) message ttl expires (not implemented in quorum queues) +%% 3) queue length limit is exceeded (only drop-head implemented in quorum queues) +%% +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% 1) message is rejected with basic.nack, requeue=false and multiple=false +dead_letter_nack(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + %% Consume them + [DTag1, DTag2, DTag3] = consume(Ch, QName, [P1, P2, P3]), + %% Nack the last one with multiple = false + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + %% Queue is empty + consume_empty(Ch, QName), + %% Consume the last message from the dead letter queue + consume(Ch, DLXQName, [P3]), + consume_empty(Ch, DLXQName), + %% Nack the other two + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = false, + requeue = false}), + %% Queue is empty + consume_empty(Ch, QName), + %% Consume the first two messages from the dead letter queue + consume(Ch, DLXQName, [P1, P2]), + consume_empty(Ch, DLXQName). + +%% 1) message is rejected with basic.nack, requeue=false and multiple=true +dead_letter_multiple_nack(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + %% Consume them + [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), + %% Nack the last one with multiple = true + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[DLXQName, <<"3">>, <<"3">>, <<"0">>]]), + %% Consume the 3 messages from the dead letter queue + consume(Ch, DLXQName, [P1, P2, P3]), + consume_empty(Ch, DLXQName), + %% Queue is empty + consume_empty(Ch, QName). + +%% 1) message is rejected with basic.nack, requeue=true and multiple=false. Dead-lettering does not take place +dead_letter_nack_requeue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume them + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), + %% Queue is empty + consume_empty(Ch, QName), + %% Nack the last one with multiple = false + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, + multiple = false, + requeue = true}), + %% Consume the last message from the queue + wait_for_messages(Config, [[QName, <<"3">>, <<"1">>, <<"2">>]]), + consume(Ch, QName, [P3]), + consume_empty(Ch, QName), + %% Dead letter queue is empty + consume_empty(Ch, DLXQName). + +%% 1) message is rejected with basic.nack, requeue=true and multiple=true. Dead-lettering does not take place +dead_letter_nack_requeue_multiple(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume them + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]), + %% Queue is empty + consume_empty(Ch, QName), + %% Nack the last one with multiple = true + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3, + multiple = true, + requeue = true}), + %% Consume the three messages from the queue + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + consume(Ch, QName, [P1, P2, P3]), + consume_empty(Ch, QName), + %% Dead letter queue is empty + consume_empty(Ch, DLXQName). + +%% 1) message is rejected with basic.reject, requeue=false +dead_letter_reject(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume the first message + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P1]), + %% Reject it + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, + requeue = false}), + %% Consume it from the dead letter queue + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, DLXQName, [P1]), + consume_empty(Ch, DLXQName), + %% Consume the last two from the queue + _ = consume(Ch, QName, [P2, P3]), + consume_empty(Ch, QName). + +%% 1) Message is rejected with basic.reject, requeue=true. Dead-lettering does not take place. +dead_letter_reject_requeue(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume the first one + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P1]), + %% Reject the first one + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, + requeue = true}), + %% Consume the three messages from the queue + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + _ = consume(Ch, QName, [P1, P2, P3]), + consume_empty(Ch, QName), + %% Dead letter is empty + consume_empty(Ch, DLXQName). + +%% 2) Message ttl expires +dead_letter_ttl(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]), + + %% Publish message + P1 = <<"msg1">>, + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + consume_empty(Ch, QName), + [_] = consume(Ch, DLXQName, [P1]). + +%% 3) The queue length limit is exceeded, message dropped is dead lettered. +%% Default strategy: drop-head +dead_letter_max_length_drop_head(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume the last one from the queue (max-length = 1) + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, QName, [P3]), + consume_empty(Ch, QName), + %% Consume the dropped ones from the dead letter queue + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]), + _ = consume(Ch, DLXQName, [P1, P2]), + consume_empty(Ch, DLXQName). + +%% Dead letter exchange does not have to be declared when the queue is declared, but it should +%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will +%% be silently dropped. +dead_letter_missing_exchange(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + DLXExchange = <<"dlx-exchange-2">>, + #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = DLXExchange}), + + DeadLetterArgs = [{<<"x-max-length">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, DLXExchange}, + {<<"x-dead-letter-routing-key">>, longstr, DLXQName}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + + %% Publish one message + publish(Ch, QName, [P1]), + %% Consume it + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P1]), + %% Reject it + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + %% Message is not in the dead letter queue (exchange does not exist) + consume_empty(Ch, DLXQName), + + %% Declare the dead-letter exchange + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + %% Publish another message + publish(Ch, QName, [P2]), + %% Consume it + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag2] = consume(Ch, QName, [P2]), + %% Reject it + amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2, + requeue = false}), + %% Consume the rejected message from the dead letter queue + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P2}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + consume_empty(Ch, DLXQName). + +%% +%% ROUTING +%% +%% Dead-lettered messages are routed to their dead letter exchange either: +%% with the routing key specified for the queue they were on; or, +%% if this was not set, (3) with the same routing keys they were originally published with. +%% (4) This includes routing keys added by the CC and BCC headers. +%% +%% 3) All previous tests used a specific key, test the original ones now. +dead_letter_routing_key(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + + %% Publish, consume and nack the first message + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P1]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Both queues are empty as the message could not been routed in the dlx exchange + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + consume_empty(Ch, QName), + consume_empty(Ch, DLXQName), + %% Bind the dlx queue with the original queue routing key + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = QName}), + %% Publish, consume and nack the second message + publish(Ch, QName, [P2]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag2] = consume(Ch, QName, [P2]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = false, + requeue = false}), + %% Message can now be routed using the recently binded key + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, DLXQName, [P2]), + consume_empty(Ch, QName). + + +%% 4a) If a specific routing key was not set for the queue, use routing keys added by the +%% CC and BCC headers +dead_letter_routing_key_header_CC(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, + + %% Publish, consume and nack two messages, one with CC header + publish(Ch, QName, [P1]), + publish(Ch, QName, [P2], [CCHeader]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_, DTag2] = consume(Ch, QName, [P1, P2]), + %% P2 is also published to the DLX queue because of the binding to the default exchange + [_] = consume(Ch, DLXQName, [P2]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = true, + requeue = false}), + %% The second message should have been routed using the CC header + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), + consume_empty(Ch, QName), + consume(Ch, DLXQName, [P2]), + consume_empty(Ch, DLXQName). + +%% 4b) If a specific routing key was not set for the queue, use routing keys added by the +%% CC and BCC headers +dead_letter_routing_key_header_BCC(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]}, + + %% Publish, consume and nack two messages, one with BCC header + publish(Ch, QName, [P1]), + publish(Ch, QName, [P2], [BCCHeader]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_, DTag2] = consume(Ch, QName, [P1, P2]), + %% P2 is also published to the DLX queue because of the binding to the default exchange + [_] = consume(Ch, DLXQName, [P2]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = true, + requeue = false}), + %% The second message should have been routed using the BCC header + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), + consume_empty(Ch, QName), + consume(Ch, DLXQName, [P2]), + consume_empty(Ch, DLXQName). + +%% It is possible to form a cycle of message dead-lettering. For instance, +%% this can happen when a queue dead-letters messages to the default exchange without +%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that +%% reach the same queue twice) will be dropped if there was no rejections in the entire cycle. +%% i.e. x-message-ttl (7), x-max-length (6) +%% +%% 6) Message is dead lettered due to queue length limit, and then dropped by the broker as it is +%% republished to the same queue. +dead_letter_routing_key_cycle_max_length(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + DeadLetterArgs = [{<<"x-max-length">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + + %% Publish messages, consume and acknowledge the second one (x-max-length = 1) + publish(Ch, QName, [P1, P2]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P2]), + consume_empty(Ch, QName), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}), + %% Queue is empty, P1 has not been republished in a loop + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + consume_empty(Ch, QName). + +%% 7) Message is dead lettered due to message ttl. Not yet implemented in quorum queues +dead_letter_routing_key_cycle_ttl(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + DeadLetterArgs = [{<<"x-message-ttl">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + + %% Publish messages + publish(Ch, QName, [P1, P2]), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + consume_empty(Ch, QName). + +%% 5) Messages continue to be republished as there are manual rejections +dead_letter_routing_key_cycle_with_reject(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + + P = <<"msg1">>, + + %% Publish message + publish(Ch, QName, [P]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Message its being republished + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, QName, [P]). + +%% +%% For any given queue, a DLX can be defined by clients using the queue's arguments, +%% or in the server using policies (8). In the case where both policy and arguments specify a DLX, +%% the one specified in arguments overrules the one specified in policy (9). +%% +%% 8) Use server policies +dead_letter_policy(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use arguments + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = Args, + durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, + durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + + %% Publish 2 messages + publish(Ch, QName, [P1, P2]), + %% Consume them + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [DTag1, DTag2] = consume(Ch, QName, [P1, P2]), + %% Nack the first one with multiple = false + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Only one message unack left in the queue + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + consume_empty(Ch, QName), + consume_empty(Ch, DLXQName), + + %% Set a policy + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, + <<"queues">>, + [{<<"dead-letter-exchange">>, DLXExchange}, + {<<"dead-letter-routing-key">>, DLXQName}]), + timer:sleep(1000), + %% Nack the second message + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = false, + requeue = false}), + %% Queue is empty + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + consume_empty(Ch, QName), + %% Consume the message from the dead letter queue + consume(Ch, DLXQName, [P2]), + consume_empty(Ch, DLXQName). + +%% 9) Argument overrides server policy +dead_letter_override_policy(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + + %% Set a policy, it creates a cycle but message will be republished with the nack. + %% Good enough for this test. + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, + <<"queues">>, + [{<<"dead-letter-exchange">>, <<>>}, + {<<"dead-letter-routing-key">>, QName}]), + + %% Declare arguments override the policy and set routing queue + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + P1 = <<"msg1">>, + + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P1]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Queue is empty + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + consume_empty(Ch, QName), + [_] = consume(Ch, DLXQName, [P1]). + +%% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be +%% overriden/ignored. +dead_letter_ignore_policy(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + %% Set a policy + ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName, + <<"queues">>, + [{<<"dead-letter-exchange">>, <<>>}, + {<<"dead-letter-routing-key">>, QName}]), + + P1 = <<"msg1">>, + + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P1]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Message is in the dead letter queue, original queue is empty + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, DLXQName, [P1]), + consume_empty(Ch, QName). + +%% +%% HEADERS +%% +%% The dead-lettering process adds an array to the header of each dead-lettered message named +%% x-death (10). This array contains an entry for each dead lettering event containing: +%% queue, reason, time, exchange, routing-keys, count +%% original-expiration (14) (if the message was dead-letterered due to per-message TTL) +%% New entries are prepended to the beginning of the x-death array. +%% Reason is one of the following: rejected (11), expired (12), maxlen (13) +%% +%% 10) and 11) Check all x-death headers, reason rejected +dead_letter_headers(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + %% Publish and nack a message + P1 = <<"msg1">>, + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P1]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Consume and check headers + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), + ?assertEqual({longstr, QName}, rabbit_misc:table_lookup(Death, <<"queue">>)), + ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), + ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death, <<"time">>)), + ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death, <<"exchange">>)), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death, <<"count">>)), + ?assertEqual({array, [{longstr, QName}]}, rabbit_misc:table_lookup(Death, <<"routing-keys">>)). + +%% 12) Per-queue message ttl has expired +dead_letter_headers_reason_expired(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]), + + %% Publish a message + P1 = <<"msg1">>, + publish(Ch, QName, [P1]), + %% Consume and check headers + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), + ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), + ?assertMatch(undefined, rabbit_misc:table_lookup(Death, <<"original-expiration">>)). + +%% 14) Per-message TTL has expired, original-expiration is added to x-death array +dead_letter_headers_reason_expired_per_message(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName), + + %% Publish a message + P1 = <<"msg1">>, + amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = P1, + props = #'P_basic'{expiration = <<"1">>}}), + %% publish another message to ensure the queue performs message expirations + publish(Ch, QName, [<<"msg2">>]), + %% Consume and check headers + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), + ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)), + ?assertMatch({longstr, <<"1">>}, rabbit_misc:table_lookup(Death, <<"original-expiration">>)). + +%% 13) Message expired with maxlen reason +dead_letter_headers_reason_maxlen(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + publish(Ch, QName, [P1, P2]), + %% Consume and check reason header + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>), + ?assertEqual({longstr, <<"maxlen">>}, rabbit_misc:table_lookup(Death, <<"reason">>)). + +%% In case x-death already contains an entry with the same queue and dead lettering reason, +%% its count field will be incremented and it will be moved to the beginning of the array +dead_letter_headers_cycle(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + + P = <<"msg1">>, + + %% Publish message + publish(Ch, QName, [P]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag] = consume(Ch, QName, [P]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P, + props = #'P_basic'{headers = Headers1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>), + ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)), + + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Message its being republished + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P, + props = #'P_basic'{headers = Headers2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>), + ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)). + +%% Dead-lettering a message modifies its headers: +%% the exchange name is replaced with that of the latest dead-letter exchange, +%% the routing key may be replaced with that specified in a queue performing dead lettering, +%% if the above happens, the CC header will also be removed (15) and +%% the BCC header will be removed as per Sender-selected distribution (16) +%% +%% CC header is kept if no dead lettering routing key is provided +dead_letter_headers_CC(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key for dead lettering, the CC header is passed + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, + publish(Ch, QName, [P1], [CCHeader]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + %% Message is published to both queues because of CC header and DLX queue bound to both + %% exchanges + {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + %% We check the headers to ensure no dead lettering has happened + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), + + %% Nack the message so it now gets dead lettered + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers3}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + consume_empty(Ch, QName), + ?assertEqual({array, [{longstr, DLXQName}]}, rabbit_misc:table_lookup(Headers3, <<"CC">>)), + ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). + +%% 15) CC header is removed when routing key is specified +dead_letter_headers_CC_with_routing_key(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key for dead lettering, the CC header is passed + DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName}, + {<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]}, + publish(Ch, QName, [P1], [CCHeader]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + %% Message is published to both queues because of CC header and DLX queue bound to both + %% exchanges + {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + %% We check the headers to ensure no dead lettering has happened + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), + + %% Nack the message so it now gets dead lettered + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers3}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + consume_empty(Ch, QName), + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"CC">>)), + ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). + +%% 16) the BCC header will always be removed +dead_letter_headers_BCC(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Do not use a specific key for dead lettering + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + P1 = <<"msg1">>, + BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]}, + publish(Ch, QName, [P1], [BCCHeader]), + %% Message is published to both queues because of BCC header and DLX queue bound to both + %% exchanges + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + %% We check the headers to ensure no dead lettering has happened + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)), + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)), + + %% Nack the message so it now gets dead lettered + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers3}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + consume_empty(Ch, QName), + ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)), + ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)). + + +%% Three top-level headers are added for the very first dead-lettering event. They are +%% x-first-death-reason, x-first-death-queue, x-first-death-exchange +%% They have the same values as the reason, queue, and exchange fields of the original +%% dead lettering event. Once added, these headers are never modified. +dead_letter_headers_first_death(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Let's create a small dead-lettering loop QName -> DLXQName -> QName + DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName}, + {<<"x-dead-letter-exchange">>, longstr, DLXExchange}], + DLXDeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, QName}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}], + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable, arguments = DLXDeadLetterArgs}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}), + + + %% Publish and nack a message + P1 = <<"msg1">>, + publish(Ch, QName, [P1]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DTag1] = consume(Ch, QName, [P1]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1, + multiple = false, + requeue = false}), + %% Consume and check headers + wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers}}} = + amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}), + ?assertEqual({longstr, <<"rejected">>}, + rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)), + ?assertEqual({longstr, <<>>}, + rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)), + %% Nack the message again so it gets dead lettered to the initial queue. x-first-death + %% headers should not change + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + {#'basic.get_ok'{}, #amqp_msg{payload = P1, + props = #'P_basic'{headers = Headers2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + ?assertEqual({longstr, <<"rejected">>}, + rabbit_misc:table_lookup(Headers2, <<"x-first-death-reason">>)), + ?assertEqual({longstr, QName}, + rabbit_misc:table_lookup(Headers2, <<"x-first-death-queue">>)), + ?assertEqual({longstr, <<>>}, + rabbit_misc:table_lookup(Headers2, <<"x-first-death-exchange">>)). + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_dead_letter_queues(Ch, Config, QName, DLXQName) -> + declare_dead_letter_queues(Ch, Config, QName, DLXQName, []). + +declare_dead_letter_queues(Ch, Config, QName, DLXQName, ExtraArgs) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + DLXExchange = ?config(dlx_exchange, Config), + + %% Declare DLX exchange + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}), + + %% Declare queue + DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}, + {<<"x-dead-letter-routing-key">>, longstr, DLXQName}], + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args ++ ExtraArgs, durable = Durable}), + + %% Declare and bind DLX queue + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName, + exchange = DLXExchange, + routing_key = DLXQName}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +publish(Ch, QName, Payloads, Headers) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = Payload, + props = #'P_basic'{headers = Headers}}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}). + +sync_mirrors(QName, Config) -> + case ?config(is_mirrored, Config) of + true -> + rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]); + _ -> ok + end. diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index e3e282f233..d2db382e30 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -568,7 +568,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 581440d179..f4f4971517 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -99,10 +99,24 @@ init_per_group(max_length_classic, Config) -> [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, {queue_durable, false}]); init_per_group(max_length_quorum, Config) -> - rabbit_ct_helpers:set_config( - Config, - [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, - {queue_durable, true}]); + Nodes = rabbit_ct_broker_helpers:get_node_configs( + Config, nodename), + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_feature_flags, + is_supported_remotely, + [Nodes, [quorum_queue], 60000]), + case Ret of + true -> + ok = rabbit_ct_broker_helpers:rpc( + Config, 0, rabbit_feature_flags, enable, [quorum_queue]), + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); + false -> + {skip, "Quorum queues are unsupported"} + end; init_per_group(max_length_mirrored, Config) -> rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), @@ -269,8 +283,7 @@ wait_for_confirms(Unconfirmed) -> end. test_amqqueue(Durable) -> - (rabbit_amqqueue:pseudo_queue(test_queue(), self())) - #amqqueue { durable = Durable }. + rabbit_amqqueue:pseudo_queue(test_queue(), self(), Durable). assert_prop(List, Prop, Value) -> case proplists:get_value(Prop, List)of @@ -695,7 +708,7 @@ head_message_timestamp1(_Config) -> QRes = rabbit_misc:r(<<"/">>, queue, QName), {ok, Q1} = rabbit_amqqueue:lookup(QRes), - QPid = Q1#amqqueue.pid, + QPid = amqqueue:get_pid(Q1), %% Set up event receiver for queue dummy_event_receiver:start(self(), [node()], [queue_stats]), @@ -944,7 +957,7 @@ confirms1(_Config) -> QName2 = DeclareBindDurableQueue(), %% Get the first one's pid (we'll crash it later) {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName1)), - QPid1 = Q1#amqqueue.pid, + QPid1 = amqqueue:get_pid(Q1), %% Enable confirms rabbit_channel:do(Ch, #'confirm.select'{}), receive diff --git a/test/vhost_SUITE.erl b/test/vhost_SUITE.erl index 2de5819b54..5b6a6bd6ff 100644 --- a/test/vhost_SUITE.erl +++ b/test/vhost_SUITE.erl @@ -354,7 +354,7 @@ node_starts_with_dead_vhosts_and_ignore_slaves(Config) -> Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename), - #amqqueue{sync_slave_pids = [Pid]} = Q, + [Pid] = amqqueue:get_sync_slave_pids(Q), Node1 = node(Pid), |
