diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 153 |
1 files changed, 152 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 406c02de83..bd54d59869 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -111,7 +111,10 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, - queue_length_limit_drop_head + queue_length_limit_drop_head, + subscribe_redelivery_limit, + subscribe_redelivery_policy, + subscribe_redelivery_limit_with_dead_letter ]. memory_tests() -> @@ -1519,6 +1522,154 @@ subscribe_redelivery_count(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0) end. +subscribe_redelivery_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end. + +subscribe_redelivery_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>, + [{<<"delivery-limit">>, 1}]), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end, + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>). + +subscribe_redelivery_limit_with_dead_letter(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLX} + ])), + ?assertEqual({'queue.declare_ok', DLX, 0, 0}, + declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + RaDlxName = ra_name(DLX), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaDlxName, 1), + wait_for_messages_pending_ack(Servers, RaDlxName, 0). + consume_redelivery_count(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
