summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-01-31 16:20:56 +0000
committerDiana Corbacho <diana@rabbitmq.com>2019-02-01 11:55:04 +0000
commit6139d4257e7563db6373293e1a43aa2a8f3ea203 (patch)
tree7b5e53ca8770381e789e271aa10265e67f37aa61
parenta8bc2fc210130a8d989b7d7d877892466f2b802b (diff)
downloadrabbitmq-server-git-6139d4257e7563db6373293e1a43aa2a8f3ea203.tar.gz
Test suite for dead letter: classic, mirrored and quorum queues
All queue types must exhibit the same behaviour, apart from non implemented features in quorum queues
-rw-r--r--test/quorum_queue_SUITE.erl30
-rw-r--r--test/quorum_queue_utils.erl29
-rw-r--r--test/unit_inbroker_dead_letter_SUITE.erl1151
3 files changed, 1181 insertions, 29 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 48dac3ca57..b1f5cfff61 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -23,6 +23,7 @@
-import(quorum_queue_utils, [wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
+ wait_for_messages/2,
dirty_query/3,
ra_name/1]).
@@ -2199,35 +2200,6 @@ get_queue_type(Server, Q) ->
rpc:call(Server, rabbit_amqqueue, lookup, [QNameRes]),
AMQQueue#amqqueue.type.
-wait_for_messages(Config, Stats) ->
- wait_for_messages(Config, lists:sort(Stats), 60).
-
-wait_for_messages(Config, Stats, 0) ->
- ?assertEqual(Stats,
- lists:sort(
- filter_queues(Stats,
- rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "messages", "messages_ready",
- "messages_unacknowledged"]))));
-wait_for_messages(Config, Stats, N) ->
- case lists:sort(
- filter_queues(Stats,
- rabbit_ct_broker_helpers:rabbitmqctl_list(
- Config, 0, ["list_queues", "name", "messages", "messages_ready",
- "messages_unacknowledged"]))) of
- Stats0 when Stats0 == Stats ->
- ok;
- _ ->
- timer:sleep(500),
- wait_for_messages(Config, Stats, N - 1)
- end.
-
-filter_queues(Expected, Got) ->
- Keys = [K || [K, _, _, _] <- Expected],
- lists:filter(fun([K, _, _, _]) ->
- lists:member(K, Keys)
- end, Got).
-
publish_many(Ch, Queue, Count) ->
[publish(Ch, Queue) || _ <- lists:seq(1, Count)].
diff --git a/test/quorum_queue_utils.erl b/test/quorum_queue_utils.erl
index 6b820c7b5c..9c988bc066 100644
--- a/test/quorum_queue_utils.erl
+++ b/test/quorum_queue_utils.erl
@@ -6,6 +6,7 @@
wait_for_messages_ready/3,
wait_for_messages_pending_ack/3,
wait_for_messages_total/3,
+ wait_for_messages/2,
dirty_query/3,
ra_name/1
]).
@@ -45,6 +46,29 @@ wait_for_messages(Servers, QName, Number, Fun, N) ->
wait_for_messages(Servers, QName, Number, Fun, N - 1)
end.
+wait_for_messages(Config, Stats) ->
+ wait_for_messages(Config, lists:sort(Stats), 10).
+
+wait_for_messages(Config, Stats, 0) ->
+ ?assertEqual(Stats,
+ lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))));
+wait_for_messages(Config, Stats, N) ->
+ case lists:sort(
+ filter_queues(Stats,
+ rabbit_ct_broker_helpers:rabbitmqctl_list(
+ Config, 0, ["list_queues", "name", "messages", "messages_ready",
+ "messages_unacknowledged"]))) of
+ Stats0 when Stats0 == Stats ->
+ ok;
+ _ ->
+ timer:sleep(500),
+ wait_for_messages(Config, Stats, N - 1)
+ end.
+
dirty_query(Servers, QName, Fun) ->
lists:map(
fun(N) ->
@@ -59,3 +83,8 @@ dirty_query(Servers, QName, Fun) ->
ra_name(Q) ->
binary_to_atom(<<"%2F_", Q/binary>>, utf8).
+filter_queues(Expected, Got) ->
+ Keys = [K || [K, _, _, _] <- Expected],
+ lists:filter(fun([K, _, _, _]) ->
+ lists:member(K, Keys)
+ end, Got).
diff --git a/test/unit_inbroker_dead_letter_SUITE.erl b/test/unit_inbroker_dead_letter_SUITE.erl
new file mode 100644
index 0000000000..f47104e4b1
--- /dev/null
+++ b/test/unit_inbroker_dead_letter_SUITE.erl
@@ -0,0 +1,1151 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved.
+%%
+%%
+%% For the full spec see: http://www.rabbitmq.com/dlx.html
+%%
+-module(unit_inbroker_dead_letter_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("kernel/include/file.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include_lib("eunit/include/eunit.hrl").
+
+-compile(export_all).
+
+-import(quorum_queue_utils, [wait_for_messages/2]).
+
+all() ->
+ [
+ {group, dead_letter_tests}
+ ].
+
+groups() ->
+ DeadLetterTests = [dead_letter_nack,
+ dead_letter_multiple_nack,
+ dead_letter_nack_requeue,
+ dead_letter_nack_requeue_multiple,
+ dead_letter_reject,
+ dead_letter_reject_requeue,
+ dead_letter_max_length_drop_head,
+ dead_letter_missing_exchange,
+ dead_letter_routing_key,
+ dead_letter_routing_key_header_CC,
+ dead_letter_routing_key_header_BCC,
+ dead_letter_routing_key_cycle_max_length,
+ dead_letter_routing_key_cycle_with_reject,
+ dead_letter_policy,
+ dead_letter_override_policy,
+ dead_letter_ignore_policy,
+ dead_letter_headers,
+ dead_letter_headers_reason_maxlen,
+ dead_letter_headers_cycle,
+ dead_letter_headers_BCC,
+ dead_letter_headers_CC,
+ dead_letter_headers_CC_with_routing_key,
+ dead_letter_headers_first_death],
+ [
+ {dead_letter_tests, [],
+ [
+ {classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
+ {mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
+ {quorum_queue, [parallel], DeadLetterTests}
+ ]}
+ ].
+
+suite() ->
+ [
+ {timetrap, {minutes, 3}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(classic_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]);
+init_per_group(quorum_queue, Config) ->
+ rabbit_ct_helpers:set_config(
+ Config,
+ [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]},
+ {queue_durable, true}]);
+init_per_group(mirrored_queue, Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>,
+ <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ Config1 = rabbit_ct_helpers:set_config(
+ Config, [{is_mirrored, true},
+ {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]},
+ {queue_durable, false}]),
+ rabbit_ct_helpers:run_steps(Config1, []);
+init_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ ClusterSize = 2,
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Group},
+ {rmq_nodes_count, ClusterSize}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+ false ->
+ rabbit_ct_helpers:run_steps(Config, [])
+ end.
+
+end_per_group(Group, Config) ->
+ case lists:member({group, Group}, all()) of
+ true ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+ false ->
+ Config
+ end.
+
+init_per_testcase(Testcase, Config) ->
+ Group = proplists:get_value(name, ?config(tc_group_properties, Config)),
+ Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])),
+ Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])),
+ Policy = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_policy", [Group, Testcase])),
+ DLXExchange = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_dlx_exchange",
+ [Group, Testcase])),
+ Config1 = rabbit_ct_helpers:set_config(Config, [{dlx_exchange, DLXExchange},
+ {queue_name, Q},
+ {queue_name_dlx, Q2},
+ {policy, Policy}]),
+ rabbit_ct_helpers:testcase_started(Config1, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_dlx, Config)}),
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = ?config(dlx_exchange, Config)}),
+ _ = rabbit_ct_broker_helpers:clear_policy(Config, 0, ?config(policy, Config)),
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Dead letter exchanges
+%%
+%% Messages are dead-lettered when:
+%% 1) message is rejected with basic.reject or basic.nack with requeue=false
+%% 2) message ttl expires (not implemented in quorum queues)
+%% 3) queue length limit is exceeded (only drop-head implemented in quorum queues)
+%%
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+%% 1) message is rejected with basic.nack, requeue=false and multiple=false
+dead_letter_nack(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume them
+ [DTag1, DTag2, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Nack the last one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Consume the last message from the dead letter queue
+ consume(Ch, DLXQName, [P3]),
+ consume_empty(Ch, DLXQName),
+ %% Nack the other two
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Consume the first two messages from the dead letter queue
+ consume(Ch, DLXQName, [P1, P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.nack, requeue=false and multiple=true
+dead_letter_multiple_nack(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume them
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Nack the last one with multiple = true
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = true,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"3">>, <<"3">>, <<"0">>]]),
+ %% Consume the 3 messages from the dead letter queue
+ consume(Ch, DLXQName, [P1, P2, P3]),
+ consume_empty(Ch, DLXQName),
+ %% Queue is empty
+ consume_empty(Ch, QName).
+
+%% 1) message is rejected with basic.nack, requeue=true and multiple=false. Dead-lettering does not take place
+dead_letter_nack_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Nack the last one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = false,
+ requeue = true}),
+ %% Consume the last message from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"1">>, <<"2">>]]),
+ consume(Ch, QName, [P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter queue is empty
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.nack, requeue=true and multiple=true. Dead-lettering does not take place
+dead_letter_nack_requeue_multiple(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [_, _, DTag3] = consume(Ch, QName, [P1, P2, P3]),
+ %% Queue is empty
+ consume_empty(Ch, QName),
+ %% Nack the last one with multiple = true
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag3,
+ multiple = true,
+ requeue = true}),
+ %% Consume the three messages from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ consume(Ch, QName, [P1, P2, P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter queue is empty
+ consume_empty(Ch, DLXQName).
+
+%% 1) message is rejected with basic.reject, requeue=false
+dead_letter_reject(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first message
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = false}),
+ %% Consume it from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P1]),
+ consume_empty(Ch, DLXQName),
+ %% Consume the last two from the queue
+ _ = consume(Ch, QName, [P2, P3]),
+ consume_empty(Ch, QName).
+
+%% 1) Message is rejected with basic.reject, requeue=true. Dead-lettering does not take place.
+dead_letter_reject_requeue(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first one
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject the first one
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = true}),
+ %% Consume the three messages from the queue
+ wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P1, P2, P3]),
+ consume_empty(Ch, QName),
+ %% Dead letter is empty
+ consume_empty(Ch, DLXQName).
+
+%% 2) Message ttl expires
+dead_letter_ttl(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]),
+
+ %% Publish message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ [_] = consume(Ch, DLXQName, [P1]).
+
+%% 3) The queue length limit is exceeded, message dropped is dead lettered.
+%% Default strategy: drop-head
+dead_letter_max_length_drop_head(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the last one from the queue (max-length = 1)
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P3]),
+ consume_empty(Ch, QName),
+ %% Consume the dropped ones from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P1, P2]),
+ consume_empty(Ch, DLXQName).
+
+%% Dead letter exchange does not have to be declared when the queue is declared, but it should
+%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will
+%% be silently dropped.
+dead_letter_missing_exchange(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ DLXExchange = <<"dlx-exchange-2">>,
+ #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = DLXExchange}),
+
+ DeadLetterArgs = [{<<"x-max-length">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange},
+ {<<"x-dead-letter-routing-key">>, longstr, DLXQName}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish one message
+ publish(Ch, QName, [P1]),
+ %% Consume it
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P1]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ %% Message is not in the dead letter queue (exchange does not exist)
+ consume_empty(Ch, DLXQName),
+
+ %% Declare the dead-letter exchange
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ %% Publish another message
+ publish(Ch, QName, [P2]),
+ %% Consume it
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag2] = consume(Ch, QName, [P2]),
+ %% Reject it
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = DTag2,
+ requeue = false}),
+ %% Consume the rejected message from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, DLXQName).
+
+%%
+%% ROUTING
+%%
+%% Dead-lettered messages are routed to their dead letter exchange either:
+%% with the routing key specified for the queue they were on; or,
+%% if this was not set, (3) with the same routing keys they were originally published with.
+%% (4) This includes routing keys added by the CC and BCC headers.
+%%
+%% 3) All previous tests used an specific key, test the original ones now.
+dead_letter_routing_key(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish, consume and nack a first message
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Both queues are empty as the message could not been routed in the dlx exchange
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ consume_empty(Ch, DLXQName),
+ %% Bind the dlx queue with the original queue routing key
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = QName}),
+ %% Publish, consume and nack a second message
+ publish(Ch, QName, [P2]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag2] = consume(Ch, QName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Message can now be routed using the recently binded key
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, QName).
+
+
+%% 4a) If a specific routing key was not set for the queue, use routing keys added by the
+%% CC and BCC headers
+dead_letter_routing_key_header_CC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+
+ %% Publish, consume and nack two messages, one with CC header
+ publish(Ch, QName, [P1]),
+ publish(Ch, QName, [P2], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% P2 is also published to the DLX queue because of the binding to the default exchange
+ [_] = consume(Ch, DLXQName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = false}),
+ %% The second message should have been routed using the CC header
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 4b) If a specific routing key was not set for the queue, use routing keys added by the
+%% CC and BCC headers
+dead_letter_routing_key_header_BCC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
+
+ %% Publish, consume and nack two messages, one with BCC header
+ publish(Ch, QName, [P1]),
+ publish(Ch, QName, [P2], [BCCHeader]),
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [_, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% P2 is also published to the DLX queue because of the binding to the default exchange
+ [_] = consume(Ch, DLXQName, [P2]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = true,
+ requeue = false}),
+ %% The second message should have been routed using the BCC header
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% It is possible to form a cycle of message dead-lettering. For instance,
+%% this can happen when a queue dead-letters messages to the default exchange without
+%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that
+%% reach the same queue twice) will be dropped if there was no rejections in the entire cycle.
+%% i.e. x-message-ttl (7), x-max-length (6)
+%%
+%% 6) Message is dead lettered due to queue length limit, and then dropped by the broker as it is
+%% republished to the same queue.
+dead_letter_routing_key_cycle_max_length(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-max-length">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish messages, consume and acknowledge the second one (x-max-length = 1)
+ publish(Ch, QName, [P1, P2]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P2]),
+ consume_empty(Ch, QName),
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
+ %% Queue is empty, P1 has not been republished in a loop
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName).
+
+%% 7) Message is dead lettered due to message ttl. Not yet implemented in quorum queues
+dead_letter_routing_key_cycle_ttl(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-message-ttl">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish messages
+ publish(Ch, QName, [P1, P2]),
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName).
+
+%% 5) Messages continue to be republished as there are manual rejections
+dead_letter_routing_key_cycle_with_reject(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P = <<"msg1">>,
+
+ %% Publish message
+ publish(Ch, QName, [P]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message its being republished
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, QName, [P]).
+
+%%
+%% For any given queue, a DLX can be defined by clients using the queue's arguments,
+%% or in the server using policies (8). In the case where both policy and arguments specify a DLX,
+%% the one specified in arguments overrules the one specified in policy (9).
+%%
+%% 8) Use server policies
+dead_letter_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use arguments
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = Args,
+ durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName,
+ durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+
+ %% Publish 2 messages
+ publish(Ch, QName, [P1, P2]),
+ %% Consume them
+ wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]),
+ [DTag1, DTag2] = consume(Ch, QName, [P1, P2]),
+ %% Nack the first one with multiple = false
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Only one message unack left in the queue
+ wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]),
+ consume_empty(Ch, QName),
+ consume_empty(Ch, DLXQName),
+
+ %% Set a policy
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, DLXExchange},
+ {<<"dead-letter-routing-key">>, DLXQName}]),
+ timer:sleep(1000),
+ %% Nack the second message
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ %% Consume the message from the dead letter queue
+ consume(Ch, DLXQName, [P2]),
+ consume_empty(Ch, DLXQName).
+
+%% 9) Argument overrides server policy
+dead_letter_override_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ %% Set a policy, it creates a cycle but message will be republished with the nack.
+ %% Good enough for this test.
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<>>},
+ {<<"dead-letter-routing-key">>, QName}]),
+
+ %% Declare arguments override the policy and set routing queue
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ P1 = <<"msg1">>,
+
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Queue is empty
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ consume_empty(Ch, QName),
+ [_] = consume(Ch, DLXQName, [P1]).
+
+%% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be
+%% overriden/ignored.
+dead_letter_ignore_policy(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Set a policy
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0, ?config(policy, Config), QName,
+ <<"queues">>,
+ [{<<"dead-letter-exchange">>, <<>>},
+ {<<"dead-letter-routing-key">>, QName}]),
+
+ P1 = <<"msg1">>,
+
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message is in the dead letter queue, original queue is empty
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ [_] = consume(Ch, DLXQName, [P1]),
+ consume_empty(Ch, QName).
+
+%%
+%% HEADERS
+%%
+%% The dead-lettering process adds an array to the header of each dead-lettered message named
+%% x-death (10). This array contains an entry for each dead lettering event containing:
+%% queue, reason, time, exchange, routing-keys, count
+%% original-expiration (14) (if the message was dead-letterered due to per-message TTL)
+%% New entries are prepended to the beginning of the x-death array.
+%% Reason is one of the following: rejected (11), expired (12), maxlen (13)
+%%
+%% 10) and 11) Check all x-death headers, reason rejected
+dead_letter_headers(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Publish and nack a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, QName}, rabbit_misc:table_lookup(Death, <<"queue">>)),
+ ?assertEqual({longstr, <<"rejected">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch({timestamp, _}, rabbit_misc:table_lookup(Death, <<"time">>)),
+ ?assertEqual({longstr, <<>>}, rabbit_misc:table_lookup(Death, <<"exchange">>)),
+ ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death, <<"count">>)),
+ ?assertEqual({array, [{longstr, QName}]}, rabbit_misc:table_lookup(Death, <<"routing-keys">>)).
+
+%% 12) Per-queue message ttl has expired
+dead_letter_headers_reason_expired(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-message-ttl">>, long, 1}]),
+
+ %% Publish a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch(undefined, rabbit_misc:table_lookup(Death, <<"original-expiration">>)).
+
+%% 14) Per-message TTL has expired, original-expiration is added to x-death array
+dead_letter_headers_reason_expired_per_message(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName),
+
+ %% Publish a message
+ P1 = <<"msg1">>,
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = P1,
+ props = #'P_basic'{expiration = <<"1">>}}),
+ %% publish another message to ensure the queue performs message expirations
+ publish(Ch, QName, [<<"msg2">>]),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"expired">>}, rabbit_misc:table_lookup(Death, <<"reason">>)),
+ ?assertMatch({longstr, <<"1">>}, rabbit_misc:table_lookup(Death, <<"original-expiration">>)).
+
+%% 13) Message expired with maxlen reason
+dead_letter_headers_reason_maxlen(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, [{<<"x-max-length">>, long, 1}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ publish(Ch, QName, [P1, P2]),
+ %% Consume and check reason header
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ {array, [{table, Death}]} = rabbit_misc:table_lookup(Headers, <<"x-death">>),
+ ?assertEqual({longstr, <<"maxlen">>}, rabbit_misc:table_lookup(Death, <<"reason">>)).
+
+%% In case x-death already contains an entry with the same queue and dead lettering reason,
+%% its count field will be incremented and it will be moved to the beginning of the array
+dead_letter_headers_cycle(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ QName = ?config(queue_name, Config),
+
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+
+ P = <<"msg1">>,
+
+ %% Publish message
+ publish(Ch, QName, [P]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag] = consume(Ch, QName, [P]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {array, [{table, Death1}]} = rabbit_misc:table_lookup(Headers1, <<"x-death">>),
+ ?assertEqual({long, 1}, rabbit_misc:table_lookup(Death1, <<"count">>)),
+
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Message its being republished
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {array, [{table, Death2}]} = rabbit_misc:table_lookup(Headers2, <<"x-death">>),
+ ?assertEqual({long, 2}, rabbit_misc:table_lookup(Death2, <<"count">>)).
+
+%% Dead-lettering a message modifies its headers:
+%% the exchange name is replaced with that of the latest dead-letter exchange,
+%% the routing key may be replaced with that specified in a queue performing dead lettering,
+%% if the above happens, the CC header will also be removed (15) and
+%% the BCC header will be removed as per Sender-selected distribution (16)
+%%
+%% CC header is kept if no dead lettering routing key is provided
+dead_letter_headers_CC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key for dead lettering, the CC header is passed
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Message is published to both queues because of CC header and DLX queue bound to both
+ %% exchanges
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual({array, [{longstr, DLXQName}]}, rabbit_misc:table_lookup(Headers3, <<"CC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+%% 15) CC header is removed when routing key is specified
+dead_letter_headers_CC_with_routing_key(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key for dead lettering, the CC header is passed
+ DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ CCHeader = {<<"CC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [CCHeader]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ %% Message is published to both queues because of CC header and DLX queue bound to both
+ %% exchanges
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"CC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+%% 16) the BCC header will always be removed
+dead_letter_headers_BCC(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Do not use an specific key for dead lettering
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+ P1 = <<"msg1">>,
+ BCCHeader = {<<"BCC">>, array, [{longstr, DLXQName}]},
+ publish(Ch, QName, [P1], [BCCHeader]),
+ %% Message is published to both queues because of BCC header and DLX queue bound to both
+ %% exchanges
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag1}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers1}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ %% We check the headers to ensure no dead lettering has happened
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers1, <<"x-death">>)),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers2, <<"x-death">>)),
+
+ %% Nack the message so it now gets dead lettered
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"1">>, <<"1">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers3}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ consume_empty(Ch, QName),
+ ?assertEqual(undefined, rabbit_misc:table_lookup(Headers3, <<"BCC">>)),
+ ?assertMatch({array, _}, rabbit_misc:table_lookup(Headers3, <<"x-death">>)).
+
+
+%% Three top-level headers are added for the very first dead-lettering event. They are
+%% x-first-death-reason, x-first-death-queue, x-first-death-exchange
+%% They have the same values as the reason, queue, and exchange fields of the original
+%% dead lettering event. Once added, these headers are never modified.
+dead_letter_headers_first_death(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Let's create a small dead-lettering loop QName -> DLXQName -> QName
+ DeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, DLXQName},
+ {<<"x-dead-letter-exchange">>, longstr, DLXExchange}],
+ DLXDeadLetterArgs = [{<<"x-dead-letter-routing-key">>, longstr, QName},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>}],
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args, durable = Durable}),
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable, arguments = DLXDeadLetterArgs}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}),
+
+
+ %% Publish and nack a message
+ P1 = <<"msg1">>,
+ publish(Ch, QName, [P1]),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ [DTag1] = consume(Ch, QName, [P1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag1,
+ multiple = false,
+ requeue = false}),
+ %% Consume and check headers
+ wait_for_messages(Config, [[DLXQName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{delivery_tag = DTag2}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = DLXQName}),
+ ?assertEqual({longstr, <<"rejected">>},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-reason">>)),
+ ?assertEqual({longstr, QName},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-queue">>)),
+ ?assertEqual({longstr, <<>>},
+ rabbit_misc:table_lookup(Headers, <<"x-first-death-exchange">>)),
+ %% Nack the message again so it gets dead lettered to the initial queue. x-first-death
+ %% headers should not change
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag2,
+ multiple = false,
+ requeue = false}),
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ {#'basic.get_ok'{}, #amqp_msg{payload = P1,
+ props = #'P_basic'{headers = Headers2}}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ ?assertEqual({longstr, <<"rejected">>},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-reason">>)),
+ ?assertEqual({longstr, QName},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-queue">>)),
+ ?assertEqual({longstr, <<>>},
+ rabbit_misc:table_lookup(Headers2, <<"x-first-death-exchange">>)).
+
+%%%%%%%%%%%%%%%%%%%%%%%%
+%% Test helpers
+%%%%%%%%%%%%%%%%%%%%%%%%
+declare_dead_letter_queues(Ch, Config, QName, DLXQName) ->
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName, []).
+
+declare_dead_letter_queues(Ch, Config, QName, DLXQName, ExtraArgs) ->
+ Args = ?config(queue_args, Config),
+ Durable = ?config(queue_durable, Config),
+ DLXExchange = ?config(dlx_exchange, Config),
+
+ %% Declare DLX exchange
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = DLXExchange}),
+
+ %% Declare queue
+ DeadLetterArgs = [{<<"x-dead-letter-exchange">>, longstr, DLXExchange},
+ {<<"x-dead-letter-routing-key">>, longstr, DLXQName}],
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, arguments = DeadLetterArgs ++ Args ++ ExtraArgs, durable = Durable}),
+
+ %% Declare and bind DLX queue
+ #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = DLXQName, durable = Durable}),
+ #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{queue = DLXQName,
+ exchange = DLXExchange,
+ routing_key = DLXQName}).
+
+publish(Ch, QName, Payloads) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload})
+ || Payload <- Payloads].
+
+publish(Ch, QName, Payloads, Headers) ->
+ [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = Payload,
+ props = #'P_basic'{headers = Headers}})
+ || Payload <- Payloads].
+
+consume(Ch, QName, Payloads) ->
+ [begin
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = QName}),
+ DTag
+ end || Payload <- Payloads].
+
+consume_empty(Ch, QName) ->
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = QName}).
+
+sync_mirrors(QName, Config) ->
+ case ?config(is_mirrored, Config) of
+ true ->
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, [<<"sync_queue">>, QName]);
+ _ -> ok
+ end.
+
+