summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/quorum_queue_SUITE.erl44
1 files changed, 41 insertions, 3 deletions
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 61fd7a1b2c..4190903409 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -116,6 +116,8 @@ all_tests() ->
basic_recover,
idempotent_recover,
vhost_with_quorum_queue_is_deleted,
+ delete_immediately,
+ delete_immediately_by_resource,
consume_redelivery_count,
subscribe_redelivery_count
].
@@ -1914,6 +1916,42 @@ basic_recover(Config) ->
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
+delete_immediately(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}],
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, Args)),
+
+ Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."],
+ {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd),
+ ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])),
+
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ amqp_channel:call(Ch, #'queue.declare'{queue = QQ,
+ durable = true,
+ passive = true,
+ auto_delete = false,
+ arguments = Args})).
+
+delete_immediately_by_resource(Config) ->
+ Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."],
+ ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)),
+
+ %% Check that the application and process are down
+ wait_until(fun() ->
+ [] == rpc:call(Server, supervisor, which_children, [ra_server_sup])
+ end),
+ ?assertMatch({ra, _, _}, lists:keyfind(ra, 1,
+ rpc:call(Server, application, which_applications, []))).
+
subscribe_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
@@ -1967,7 +2005,6 @@ consume_redelivery_count(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
-
RaName = ra_name(QQ),
publish(Ch, QQ),
wait_for_messages_ready(Servers, RaName, 1),
@@ -1984,7 +2021,9 @@ consume_redelivery_count(Config) ->
amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
multiple = false,
requeue = true}),
-
+ %% wait for requeueing
+ timer:sleep(500),
+
{#'basic.get_ok'{delivery_tag = DeliveryTag1,
redelivered = true},
#amqp_msg{props = #'P_basic'{headers = H1}}} =
@@ -2005,7 +2044,6 @@ consume_redelivery_count(Config) ->
multiple = false,
requeue = true}).
-
%%----------------------------------------------------------------------------
declare(Ch, Q) ->