summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorArthur Khashaev <arthur@khashaev.ru>2019-05-23 20:38:28 +0300
committerArthur Khashaev <arthur@khashaev.ru>2019-05-27 22:34:52 +0300
commit89f464f04fc0dd23ce112b647f84b5e83ee5166f (patch)
treefbf4e548da8a40a4a13a60b128e530ab72c9084f /src
parentfcae7368286fa50b7cacfb34509eff07362e4df9 (diff)
downloadrabbitmq-server-git-89f464f04fc0dd23ce112b647f84b5e83ee5166f.tar.gz
Add an option to dead-letter rejected publishes
Add `reject-publish-dlx` overflow strategy, which is similar to `reject-publish` strategy, but also dead-letters rejected messages. Closes #1443
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_amqqueue_process.erl19
-rw-r--r--src/rabbit_policies.erl2
3 files changed, 22 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9e94dd8f27..85c647ae8c 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
check_overflow({longstr, Val}, _Args) ->
- case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of
+ case lists:member(Val, [<<"drop-head">>,
+ <<"reject-publish">>,
+ <<"reject-publish-dlx">>]) of
true -> ok;
false -> {error, invalid_overflow}
end;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b3f89b7ef0..2666847e77 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -82,7 +82,7 @@
%% max length in bytes, if configured
max_bytes,
%% an action to perform if queue is to be over a limit,
- %% can be either drop-head (default) or reject-publish
+ %% can be either drop-head (default), reject-publish or reject-publish-dlx
overflow,
%% when policies change, this version helps queue
%% determine what previously scheduled/set up state to ignore,
@@ -704,12 +704,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message},
Delivered,
State = #q{overflow = Overflow,
backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ dlx = DLX,
+ dlx_routing_key = RK}) ->
send_mandatory(Delivery), %% must do this before confirms
case {will_overflow(Delivery, State), Overflow} of
{true, 'reject-publish'} ->
%% Drop publish and nack to publisher
send_reject_publish(Delivery, Delivered, State);
+ {true, 'reject-publish-dlx'} ->
+ %% Publish to DLX
+ with_dlx(
+ DLX,
+ fun (X) ->
+ QName = qname(State),
+ rabbit_dead_letter:publish(Message, maxlen, X, RK, QName)
+ end,
+ fun () -> ok end),
+ %% Drop publish and nack to publisher
+ send_reject_publish(Delivery, Delivered, State);
_ ->
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State1 = State#q{backing_queue_state = BQS1},
@@ -766,6 +779,8 @@ maybe_drop_head(State = #q{max_length = undefined,
{false, State};
maybe_drop_head(State = #q{overflow = 'reject-publish'}) ->
{false, State};
+maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) ->
+ {false, State};
maybe_drop_head(State = #q{overflow = 'drop-head'}) ->
maybe_drop_head(false, State).
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 7878bed02d..c4f4226448 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
ok;
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
+validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) ->
+ ok;
validate_policy0(<<"overflow">>, Value) ->
{error, "~p is not a valid overflow value", [Value]};