diff options
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 2 | ||||
| -rw-r--r-- | test/confirms_rejects_SUITE.erl | 48 | ||||
| -rw-r--r-- | test/dead_lettering_SUITE.erl | 33 | ||||
| -rw-r--r-- | test/priority_queue_SUITE.erl | 24 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 11 | ||||
| -rw-r--r-- | test/simple_ha_SUITE.erl | 23 |
8 files changed, 130 insertions, 34 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e94dd8f27..85c647ae8c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_overflow({longstr, Val}, _Args) -> - case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of true -> ok; false -> {error, invalid_overflow} end; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3f89b7ef0..2666847e77 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -82,7 +82,7 @@ %% max length in bytes, if configured max_bytes, %% an action to perform if queue is to be over a limit, - %% can be either drop-head (default) or reject-publish + %% can be either drop-head (default), reject-publish or reject-publish-dlx overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, @@ -704,12 +704,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, Delivered, State = #q{overflow = Overflow, backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + dlx = DLX, + dlx_routing_key = RK}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); + {true, 'reject-publish-dlx'} -> + %% Publish to DLX + with_dlx( + DLX, + fun (X) -> + QName = qname(State), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> ok end), + %% Drop publish and nack to publisher + send_reject_publish(Delivery, Delivered, State); _ -> {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State1 = State#q{backing_queue_state = BQS1}, @@ -766,6 +779,8 @@ maybe_drop_head(State = #q{max_length = undefined, {false, State}; maybe_drop_head(State = #q{overflow = 'reject-publish'}) -> {false, State}; +maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) -> + {false, State}; maybe_drop_head(State = #q{overflow = 'drop-head'}) -> maybe_drop_head(false, State). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 7878bed02d..c4f4226448 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> ok; validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; +validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) -> + ok; validate_policy0(<<"overflow">>, Value) -> {error, "~p is not a valid overflow value", [Value]}; diff --git a/test/confirms_rejects_SUITE.erl b/test/confirms_rejects_SUITE.erl index 6b2133b8ff..402bca8737 100644 --- a/test/confirms_rejects_SUITE.erl +++ b/test/confirms_rejects_SUITE.erl @@ -11,13 +11,17 @@ all() -> ]. groups() -> + OverflowTests = [ + confirms_rejects_conflict, + policy_resets_to_default + ], [ {parallel_tests, [parallel], [ - confirms_rejects_conflict, - policy_resets_to_default, - dead_queue_rejects, - mixed_dead_alive_queues_reject - ]} + {overflow_reject_publish_dlx, [parallel], OverflowTests}, + {overflow_reject_publish, [parallel], OverflowTests}, + dead_queue_rejects, + mixed_dead_alive_queues_reject + ]} ]. init_per_suite(Config) -> @@ -28,6 +32,14 @@ end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} + ]); init_per_group(Group, Config) -> ClusterSize = 2, Config1 = rabbit_ct_helpers:set_config(Config, [ @@ -38,6 +50,10 @@ init_per_group(Group, Config) -> rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). +end_per_group(overflow_reject_publish, _Config) -> + ok; +end_per_group(overflow_reject_publish_dlx, _Config) -> + ok; end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -60,7 +76,9 @@ init_per_testcase(Testcase, Config) end_per_testcase(policy_resets_to_default = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"policy_resets_to_default">>}), + XOverflow = ?config(overflow, Config), + QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>, + amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}), rabbit_ct_client_helpers:close_channels_and_connection(Config, 0), Conn = ?config(conn, Config), @@ -70,7 +88,9 @@ end_per_testcase(policy_resets_to_default = Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase); end_per_testcase(confirms_rejects_conflict = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - amqp_channel:call(Ch, #'queue.delete'{queue = <<"confirms_rejects_conflict">>}), + XOverflow = ?config(overflow, Config), + QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>, + amqp_channel:call(Ch, #'queue.delete'{queue = QueueName}), end_per_testcase0(Testcase, Config); end_per_testcase(dead_queue_rejects = Testcase, Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), @@ -187,15 +207,15 @@ confirms_rejects_conflict(Config) -> false = Conn =:= Conn1, false = Ch =:= Ch1, - QueueName = <<"confirms_rejects_conflict">>, - amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), + XOverflow = ?config(overflow, Config), + QueueName = <<"confirms_rejects_conflict", "_", XOverflow/binary>>, amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, durable = true, - arguments = [{<<"x-max-length">>,long,12}, - {<<"x-overflow">>,longstr,<<"reject-publish">>}] + arguments = [{<<"x-max-length">>, long, 12}, + {<<"x-overflow">>, longstr, XOverflow}] }), %% Consume 3 messages at once. Do that often. Consume = fun Consume() -> @@ -238,12 +258,14 @@ confirms_rejects_conflict(Config) -> policy_resets_to_default(Config) -> Conn = ?config(conn, Config), + {ok, Ch} = amqp_connection:open_channel(Conn), - QueueName = <<"policy_resets_to_default">>, amqp_channel:call(Ch, #'confirm.select'{}), amqp_channel:register_confirm_handler(Ch, self()), + XOverflow = ?config(overflow, Config), + QueueName = <<"policy_resets_to_default", "_", XOverflow/binary>>, amqp_channel:call(Ch, #'queue.declare'{queue = QueueName, durable = true }), @@ -251,7 +273,7 @@ policy_resets_to_default(Config) -> rabbit_ct_broker_helpers:set_policy( Config, 0, QueueName, QueueName, <<"queues">>, - [{<<"max-length">>, MaxLength}, {<<"overflow">>, <<"reject-publish">>}]), + [{<<"max-length">>, MaxLength}, {<<"overflow">>, XOverflow}]), timer:sleep(1000), diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 6fe2a3a522..fe5e91c8ed 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -60,13 +60,15 @@ groups() -> {dead_letter_tests, [], [ {classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, + dead_letter_max_length_reject_publish_dlx, dead_letter_routing_key_cycle_ttl, dead_letter_headers_reason_expired, dead_letter_headers_reason_expired_per_message]}, {mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl, - dead_letter_routing_key_cycle_ttl, - dead_letter_headers_reason_expired, - dead_letter_headers_reason_expired_per_message]}, + dead_letter_max_length_reject_publish_dlx, + dead_letter_routing_key_cycle_ttl, + dead_letter_headers_reason_expired, + dead_letter_headers_reason_expired_per_message]}, {quorum_queue, [parallel], DeadLetterTests} ]} ]. @@ -381,6 +383,31 @@ dead_letter_max_length_drop_head(Config) -> _ = consume(Ch, DLXQName, [P1, P2]), consume_empty(Ch, DLXQName). +%% Another strategy: reject-publish-dlx +dead_letter_max_length_reject_publish_dlx(Config) -> + {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + DLXQName = ?config(queue_name_dlx, Config), + + declare_dead_letter_queues(Ch, Config, QName, DLXQName, + [{<<"x-max-length">>, long, 1}, + {<<"x-overflow">>, longstr, <<"reject-publish-dlx">>}]), + + P1 = <<"msg1">>, + P2 = <<"msg2">>, + P3 = <<"msg3">>, + + %% Publish 3 messages + publish(Ch, QName, [P1, P2, P3]), + %% Consume the first one from the queue (max-length = 1) + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, QName, [P1]), + consume_empty(Ch, QName), + %% Consume the dropped ones from the dead letter queue + wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]), + _ = consume(Ch, DLXQName, [P2, P3]), + consume_empty(Ch, DLXQName). + %% Dead letter exchange does not have to be declared when the queue is declared, but it should %% exist by the time messages need to be dead-lettered; if it is missing then, the messages will %% be silently dropped. diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl index 5bac8482fa..f7e20a9fe6 100644 --- a/test/priority_queue_SUITE.erl +++ b/test/priority_queue_SUITE.erl @@ -33,7 +33,8 @@ groups() -> {cluster_size_2, [], [ ackfold, drop, - reject, + {overflow_reject_publish, [], [reject]}, + {overflow_reject_publish_dlx, [], [reject]}, dropwhile_fetchwhile, info_head_message_timestamp, matching, @@ -87,8 +88,20 @@ init_per_group(cluster_size_3, Config) -> ]), rabbit_ct_helpers:run_steps(Config1, rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). - + rabbit_ct_client_helpers:setup_steps()); +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} + ]). + +end_per_group(overflow_reject_publish, _Config) -> + ok; +end_per_group(overflow_reject_publish_dlx, _Config) -> + ok; end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_client_helpers:teardown_steps() ++ @@ -334,9 +347,10 @@ drop(Config) -> reject(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - Q = <<"reject-queue">>, + XOverflow = ?config(overflow, Config), + Q = <<"reject-queue-", XOverflow/binary>>, declare(Ch, Q, [{<<"x-max-length">>, long, 4}, - {<<"x-overflow">>, longstr, <<"reject-publish">>} + {<<"x-overflow">>, longstr, XOverflow} | arguments(3)]), publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]), %% First 4 messages are published, all others are discarded. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index c23b7ac85e..1d9789fe89 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -323,11 +323,12 @@ declare_invalid_args(Config) -> LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, {<<"x-max-priority">>, long, 2000}])), - ?assertExit( - {{shutdown, {server_initiated_close, 406, _}}, _}, - declare(rabbit_ct_client_helpers:open_channel(Config, Server), - LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}])), + [?assertExit( + {{shutdown, {server_initiated_close, 406, _}}, _}, + declare(rabbit_ct_client_helpers:open_channel(Config, Server), + LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-overflow">>, longstr, XOverflow}])) + || XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]], ?assertExit( {{shutdown, {server_initiated_close, 406, _}}, _}, diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl index 20012b09c8..b2caff86a9 100644 --- a/test/simple_ha_SUITE.erl +++ b/test/simple_ha_SUITE.erl @@ -30,6 +30,11 @@ all() -> ]. groups() -> + RejectTests = [ + rejects_survive_stop, + rejects_survive_sigkill, + rejects_survive_policy + ], [ {cluster_size_2, [], [ rapid_redeclare, @@ -45,9 +50,8 @@ groups() -> confirms_survive_stop, confirms_survive_sigkill, confirms_survive_policy, - rejects_survive_stop, - rejects_survive_sigkill, - rejects_survive_policy + {overflow_reject_publish, [], RejectTests}, + {overflow_reject_publish_dlx, [], RejectTests} ]} ]. @@ -69,6 +73,14 @@ init_per_group(cluster_size_2, Config) -> init_per_group(cluster_size_3, Config) -> rabbit_ct_helpers:set_config(Config, [ {rmq_nodes_count, 3} + ]); +init_per_group(overflow_reject_publish, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish">>} + ]); +init_per_group(overflow_reject_publish_dlx, Config) -> + rabbit_ct_helpers:set_config(Config, [ + {overflow, <<"reject-publish-dlx">>} ]). end_per_group(_, Config) -> @@ -227,12 +239,13 @@ rejects_survive(Config, DeathFun) -> Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B), %% declare the queue on the master, mirrored to the two slaves - Queue = <<"test_rejects">>, + XOverflow = ?config(overflow, Config), + Queue = <<"test_rejects", "_", XOverflow/binary>>, amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue, auto_delete = false, durable = true, arguments = [{<<"x-max-length">>, long, 1}, - {<<"x-overflow">>, longstr, <<"reject-publish">>}]}), + {<<"x-overflow">>, longstr, XOverflow}]}), Payload = <<"there can be only one">>, amqp_channel:call(Node1Channel, #'basic.publish'{routing_key = Queue}, |
