summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_policies.erl2
-rw-r--r--test/dead_lettering_SUITE.erl33
4 files changed, 52 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e94dd8f27..85c647ae8c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
check_overflow({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
+ case lists:member(Val, [<<"drop-head">>,
+ <<"reject-publish">>,
+ <<"reject-publish-dlx">>]) of
true -> ok;
false -> {error, invalid_overflow}
end;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3f89b7ef0..2666847e77 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -82,7 +82,7 @@
%% max length in bytes, if configured
max_bytes,
%% an action to perform if queue is to be over a limit,
- %% can be either drop-head (default) or reject-publish
+ %% can be either drop-head (default), reject-publish or reject-publish-dlx
overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
@@ -704,12 +704,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
Delivered,
State = #q{overflow = Overflow,
backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ dlx = DLX,
+ dlx_routing_key = RK}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
+ {true, 'reject-publish-dlx'} ->
+ %% Publish to DLX
+ with_dlx(
+ DLX,
+ fun (X) ->
+ QName = qname(State),
+ rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
+ end,
+ fun () -> ok end),
+ %% Drop publish and nack to publisher
+ send_reject_publish(Delivery, Delivered, State);
_ ->
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
@@ -766,6 +779,8 @@ maybe_drop_head(State = #q{max_length = undefined,
{false, State};
maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
{false, State};
+maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) ->
+ {false, State};
maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
maybe_drop_head(false, State).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 7878bed02d..c4f4226448 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
ok;
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
+validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) ->
+ ok;
validate_policy0(<<"overflow">>, Value) ->
{error, "~p is not a valid overflow value", [Value]};
diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl
index 6fe2a3a522..fe5e91c8ed 100644
--- a/test/dead_lettering_SUITE.erl
+++ b/test/dead_lettering_SUITE.erl
@@ -60,13 +60,15 @@ groups() ->
{dead_letter_tests, [],
[
{classic_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
+ dead_letter_max_length_reject_publish_dlx,
dead_letter_routing_key_cycle_ttl,
dead_letter_headers_reason_expired,
dead_letter_headers_reason_expired_per_message]},
{mirrored_queue, [parallel], DeadLetterTests ++ [dead_letter_ttl,
- dead_letter_routing_key_cycle_ttl,
- dead_letter_headers_reason_expired,
- dead_letter_headers_reason_expired_per_message]},
+ dead_letter_max_length_reject_publish_dlx,
+ dead_letter_routing_key_cycle_ttl,
+ dead_letter_headers_reason_expired,
+ dead_letter_headers_reason_expired_per_message]},
{quorum_queue, [parallel], DeadLetterTests}
]}
].
@@ -381,6 +383,31 @@ dead_letter_max_length_drop_head(Config) ->
_ = consume(Ch, DLXQName, [P1, P2]),
consume_empty(Ch, DLXQName).
+%% Another strategy: reject-publish-dlx
+dead_letter_max_length_reject_publish_dlx(Config) ->
+ {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
+ QName = ?config(queue_name, Config),
+ DLXQName = ?config(queue_name_dlx, Config),
+
+ declare_dead_letter_queues(Ch, Config, QName, DLXQName,
+ [{<<"x-max-length">>, long, 1},
+ {<<"x-overflow">>, longstr, <<"reject-publish-dlx">>}]),
+
+ P1 = <<"msg1">>,
+ P2 = <<"msg2">>,
+ P3 = <<"msg3">>,
+
+ %% Publish 3 messages
+ publish(Ch, QName, [P1, P2, P3]),
+ %% Consume the first one from the queue (max-length = 1)
+ wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]),
+ _ = consume(Ch, QName, [P1]),
+ consume_empty(Ch, QName),
+ %% Consume the dropped ones from the dead letter queue
+ wait_for_messages(Config, [[DLXQName, <<"2">>, <<"2">>, <<"0">>]]),
+ _ = consume(Ch, DLXQName, [P2, P3]),
+ consume_empty(Ch, DLXQName).
+
%% Dead letter exchange does not have to be declared when the queue is declared, but it should
%% exist by the time messages need to be dead-lettered; if it is missing then, the messages will
%% be silently dropped.