summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-05-13 16:00:23 +0300
committerGitHub <noreply@github.com>2019-05-13 16:00:23 +0300
commit18552f70b8552e0fb9c656eb262a4e8018af02ce (patch)
treefccf8d861e7e698eae5213ee0ba7c73bf1a27b0d /test
parent5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff)
parent4d24352033d6778a2d5b9c20546280cb59686490 (diff)
downloadrabbitmq-server-git-18552f70b8552e0fb9c656eb262a4e8018af02ce.tar.gz
Merge pull request #1893 from rabbitmq/do_not_confirm_on_unreachable_queue
Make sure that publishes to dead or unaccessible queues return nack
Diffstat (limited to 'test')
-rw-r--r--test/confirms_rejects_SUITE.erl118
1 files changed, 116 insertions, 2 deletions
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 722b5240fc..8805b71ffe 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -14,7 +14,9 @@ groups() ->
[
{parallel_tests, [parallel], [
confirms_rejects_conflict,
- policy_resets_to_default
+ policy_resets_to_default,
+ dead_queue_rejects,
+ mixed_dead_alive_queues_reject
]}
].
@@ -45,7 +47,10 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
rabbit_ct_helpers:testcase_started(
rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
-init_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
+init_per_testcase(Testcase, Config)
+ when Testcase == confirms_rejects_conflict;
+ Testcase == dead_queue_rejects;
+ Testcase == mixed_dead_alive_queues_reject ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -66,6 +71,19 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) ->
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(dead_queue_rejects = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}),
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}),
+ end_per_testcase0(Testcase, Config).
+
+end_per_testcase0(Testcase, Config) ->
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
Conn = ?config(conn, Config),
@@ -74,9 +92,89 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_client_helpers:close_connection(Conn1),
+ clean_acks_mailbox(),
+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+dead_queue_rejects(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueName = <<"dead_queue_rejects">>,
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueName, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack);
+ {'basic.nack',_,_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_nack)
+ end.
+
+mixed_dead_alive_queues_reject(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>,
+ QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>,
+ ExchangeName = <<"mixed_dead_alive_queues_reject">>,
+
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead,
+ durable = true}),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameAlive,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameDead,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueNameDead, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.nack',_,_,_} -> ok;
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack)
+ after 10000 ->
+ error(timeout_waiting_for_ack)
+ end.
confirms_rejects_conflict(Config) ->
Conn = ?config(conn, Config),
@@ -256,3 +354,19 @@ clean_acks_mailbox() ->
after
1000 -> done
end.
+
+kill_the_queue(QueueName, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, kill_the_queue, [QueueName]).
+
+kill_the_queue(QueueName) ->
+ [begin
+ {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
+ Pid = amqqueue:get_pid(Q),
+ exit(Pid, kill)
+ end
+ || _ <- lists:seq(1, 11)].
+
+
+
+
+