diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 97 |
1 files changed, 96 insertions, 1 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 26357c3fee..88c79acf79 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -113,7 +113,9 @@ all_tests() -> cancel_sync_queue, basic_recover, idempotent_recover, - vhost_with_quorum_queue_is_deleted + vhost_with_quorum_queue_is_deleted, + consume_redelivery_count, + subscribe_redelivery_count ]. %% ------------------------------------------------------------------- @@ -1820,6 +1822,99 @@ basic_recover(Config) -> amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). + +subscribe_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} -> + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +consume_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + + DTag = <<"x-delivery-count">>, + + {#'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + + {#'basic.get_ok'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}), + + {#'basic.get_ok'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, + multiple = false, + requeue = true}). + + %%---------------------------------------------------------------------------- declare(Ch, Q) -> |
