diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-04-03 18:10:20 -0400 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2019-05-10 17:42:40 -0400 |
| commit | b7f29c1dad72eefbcafe2a7b7475777c83db53f9 (patch) | |
| tree | 996e4cf58b7a5575c6ca5ef2f308e1d64a2237b2 /test | |
| parent | 5c80bea709f4c89db3d8652f4a3e7d8421efb76e (diff) | |
| download | rabbitmq-server-git-b7f29c1dad72eefbcafe2a7b7475777c83db53f9.tar.gz | |
Change publisher confirms behaviour to reject messages if no queues confirmed.
Channel is counting unacked messages in a remove-only data structure `dtree`.
Each published message id is associated with a list of queues where it was routed to.
On confirm of queue failures queues were removed from the list
As soon as there are no queues in the list - the message can be confirmed.
This meant that if all queues fail with "not abnormal" reasons - the message may be
confirmed, but not enqueued.
This change removes dtree data structure, replacing it with specific unconfirmed_messages
data structure.
It tracks queue pids similarly to dtree, but also has an API to record confirms and failures
differently, keeping track of which queues received at least one confirm.
If all pids fails or confirm, but not all queues received confirmation - it means not all queues
enqueued the message and the message should be rejected
This is different from the current behaviour, but corresponds to the docs and common sense.
[#163952410]
Diffstat (limited to 'test')
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 118 |
1 files changed, 116 insertions, 2 deletions
diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 722b5240fc..8805b71ffe 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -14,7 +14,9 @@ groups() -> [ {parallel_tests, [parallel], [ confirms_rejects_conflict, - policy_resets_to_default + policy_resets_to_default, + dead_queue_rejects, + mixed_dead_alive_queues_reject ]} ]. @@ -45,7 +47,10 @@ init_per_testcase(policy_resets_to_default = Testcase, Config) -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), rabbit_ct_helpers:testcase_started( rabbit_ct_helpers:set_config(Config, [{conn, Conn}]), Testcase); -init_per_testcase(confirms_rejects_conflict = Testcase, Config) -> +init_per_testcase(Testcase, Config) + when Testcase == confirms_rejects_conflict; + Testcase == dead_queue_rejects; + Testcase == mixed_dead_alive_queues_reject -> Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config), Conn1 = rabbit_ct_client_helpers:open_unmanaged_connection(Config), @@ -66,6 +71,19 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) -> end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}), + end_per_testcase0(Testcase, Config); +end_per_testcase(dead_queue_rejects = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"dead_queue_rejects">>}), + end_per_testcase0(Testcase, Config); +end_per_testcase(mixed_dead_alive_queues_reject = Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_dead">>}), + amqp_channel:call(Ch, #'queue.delete'{queue = <<"mixed_dead_alive_queues_reject_alive">>}), + amqp_channel:call(Ch, #'exchange.delete'{exchange = <<"mixed_dead_alive_queues_reject">>}), + end_per_testcase0(Testcase, Config). + +end_per_testcase0(Testcase, Config) -> rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), Conn = ?config(conn, Config), @@ -74,9 +92,89 @@ end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> rabbit_ct_client_helpers:close_connection(Conn), rabbit_ct_client_helpers:close_connection(Conn1), + clean_acks_mailbox(), + rabbit_ct_helpers:testcase_finished(Config, Testcase). +dead_queue_rejects(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueName = <<"dead_queue_rejects">>, + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, + durable = true}), + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok + after 10000 -> + error(timeout_waiting_for_initial_ack) + end, + + kill_the_queue(QueueName, Config), + + amqp_channel:call(Ch, #'basic.publish'{routing_key = QueueName}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> error(expecting_nack_got_ack); + {'basic.nack',_,_,_} -> ok + after 10000 -> + error(timeout_waiting_for_nack) + end. + +mixed_dead_alive_queues_reject(Config) -> + Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), + QueueNameDead = <<"mixed_dead_alive_queues_reject_dead">>, + QueueNameAlive = <<"mixed_dead_alive_queues_reject_alive">>, + ExchangeName = <<"mixed_dead_alive_queues_reject">>, + + amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameDead, + durable = true}), + amqp_channel:call(Ch, #'queue.declare'{queue = QueueNameAlive, + durable = true}), + + amqp_channel:call(Ch, #'exchange.declare'{exchange = ExchangeName, + durable = true}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameAlive, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'queue.bind'{exchange = ExchangeName, + queue = QueueNameDead, + routing_key = <<"route">>}), + + amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName, + routing_key = <<"route">>}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.ack',_,_} -> ok + after 10000 -> + error(timeout_waiting_for_initial_ack) + end, + + kill_the_queue(QueueNameDead, Config), + + amqp_channel:call(Ch, #'basic.publish'{exchange = ExchangeName, + routing_key = <<"route">>}, + #amqp_msg{payload = <<"HI">>}), + + receive + {'basic.nack',_,_,_} -> ok; + {'basic.ack',_,_} -> error(expecting_nack_got_ack) + after 10000 -> + error(timeout_waiting_for_ack) + end. confirms_rejects_conflict(Config) -> Conn = ?config(conn, Config), @@ -256,3 +354,19 @@ clean_acks_mailbox() -> after 1000 -> done end. + +kill_the_queue(QueueName, Config) -> + rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, kill_the_queue, [QueueName]). + +kill_the_queue(QueueName) -> + [begin + {ok, Q} = rabbit_amqqueue:lookup({resource, <<"/">>, queue, QueueName}), + Pid = amqqueue:get_pid(Q), + exit(Pid, kill) + end + || _ <- lists:seq(1, 11)]. + + + + + |
