summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorDaniil Fedotov <hairyhum@gmail.com>2019-04-03 18:10:20 -0400
committerDaniil Fedotov <hairyhum@gmail.com>2019-05-10 17:42:40 -0400
commitb7f29c1dad72eefbcafe2a7b7475777c83db53f9 (patch)
tree996e4cf58b7a5575c6ca5ef2f308e1d64a2237b2 /test
parent5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff)
downloadrabbitmq-server-git-b7f29c1dad72eefbcafe2a7b7475777c83db53f9.tar.gz
Change publisher confirms behaviour to reject messages if no queues confirmed.
Channel is counting unacked messages in a remove-only data structure `dtree`. Each published message id is associated with a list of queues where it was routed to. On confirm of queue failures queues were removed from the list As soon as there are no queues in the list - the message can be confirmed. This meant that if all queues fail with "not abnormal" reasons - the message may be confirmed, but not enqueued. This change removes dtree data structure, replacing it with specific unconfirmed_messages data structure. It tracks queue pids similarly to dtree, but also has an API to record confirms and failures differently, keeping track of which queues received at least one confirm. If all pids fails or confirm, but not all queues received confirmation - it means not all queues enqueued the message and the message should be rejected This is different from the current behaviour, but corresponds to the docs and common sense. [#163952410]
Diffstat (limited to 'test')
-rw-r--r--test/confirms_rejects_SUITE.erl118
1 files changed, 116 insertions, 2 deletions
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl
index 722b5240fc..8805b71ffe 100644
--- a/test/confirms_rejects_SUITE.erl
+++ b/test/confirms_rejects_SUITE.erl
@@ -14,7 +14,9 @@ groups() ->
[
{parallel_tests, [parallel], [
confirms_rejects_conflict,
- policy_resets_to_default
+ policy_resets_to_default,
+ dead_queue_rejects,
+ mixed_dead_alive_queues_reject
]}
].
@@ -45,7 +47,10 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
rabbit_ct_helpers:testcase_started(
rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase);
-init_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
+init_per_testcase(Testcase, Config)
+ when Testcase == confirms_rejects_conflict;
+ Testcase == dead_queue_rejects;
+ Testcase == mixed_dead_alive_queues_reject ->
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
@@ -66,6 +71,19 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) ->
end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
{_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(dead_queue_rejects = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}),
+ end_per_testcase0(Testcase, Config);
+end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) ->
+ {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}),
+ amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}),
+ end_per_testcase0(Testcase, Config).
+
+end_per_testcase0(Testcase, Config) ->
rabbit_ct_client_helpers:close_channels_and_connection(Config, 0),
Conn = ?config(conn, Config),
@@ -74,9 +92,89 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) ->
rabbit_ct_client_helpers:close_connection(Conn),
rabbit_ct_client_helpers:close_connection(Conn1),
+ clean_acks_mailbox(),
+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
+dead_queue_rejects(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueName = <<"dead_queue_rejects">>,
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueName,
+ durable = true}),
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueName, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack);
+ {'basic.nack',_,_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_nack)
+ end.
+
+mixed_dead_alive_queues_reject(Config) ->
+ Conn = ?config(conn, Config),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>,
+ QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>,
+ ExchangeName = <<"mixed_dead_alive_queues_reject">>,
+
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Ch, self()),
+
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead,
+ durable = true}),
+ amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName,
+ durable = true}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameAlive,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName,
+ queue = QueueNameDead,
+ routing_key = <<"route">>}),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.ack',_,_} -> ok
+ after 10000 ->
+ error(timeout_waiting_for_initial_ack)
+ end,
+
+ kill_the_queue(QueueNameDead, Config),
+
+ amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName,
+ routing_key = <<"route">>},
+ #amqp_msg{payload = <<"HI">>}),
+
+ receive
+ {'basic.nack',_,_,_} -> ok;
+ {'basic.ack',_,_} -> error(expecting_nack_got_ack)
+ after 10000 ->
+ error(timeout_waiting_for_ack)
+ end.
confirms_rejects_conflict(Config) ->
Conn = ?config(conn, Config),
@@ -256,3 +354,19 @@ clean_acks_mailbox() ->
after
1000 -> done
end.
+
+kill_the_queue(QueueName, Config) ->
+ rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, kill_the_queue, [QueueName]).
+
+kill_the_queue(QueueName) ->
+ [begin
+ {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}),
+ Pid = amqqueue:get_pid(Q),
+ exit(Pid, kill)
+ end
+ || _ <- lists:seq(1, 11)].
+
+
+
+
+