summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-02-18 22:21:23 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-22 14:14:31 +0000
commit8a3d3e654b66a3fa53b45bd949b92accf19974da (patch)
tree79bf17f6507cba06898daaea3b718984f25012c8 /test
parentd256c78560a9d73fb39d7c159eedc00768a35dc2 (diff)
downloadrabbitmq-server-git-8a3d3e654b66a3fa53b45bd949b92accf19974da.tar.gz
Poison handling in quorum queues
Drop messages that exceed the configured number of delivery attemps. If a DLX is configured, dead letter them. [#163513253]
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl153
1 files changed, 152 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 406c02de83..bd54d59869 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -111,7 +111,10 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
- queue_length_limit_drop_head
+ queue_length_limit_drop_head,
+ subscribe_redelivery_limit,
+ subscribe_redelivery_policy,
+ subscribe_redelivery_limit_with_dead_letter
].
memory_tests() ->
@@ -1519,6 +1522,154 @@ subscribe_redelivery_count(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0)
end.
+subscribe_redelivery_limit(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end.
+
+subscribe_redelivery_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
+ [{<<"delivery-limit">>, 1}]),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end,
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
+
+subscribe_redelivery_limit_with_dead_letter(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, DLX}
+ ])),
+ ?assertEqual({'queue.declare_ok', DLX, 0, 0},
+ declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ RaDlxName = ra_name(DLX),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaDlxName, 1),
+ wait_for_messages_pending_ack(Servers, RaDlxName, 0).
+
consume_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),