diff options
| author | Arthur Khashaev <arthur@khashaev.ru> | 2019-05-23 20:38:28 +0300 |
|---|---|---|
| committer | Arthur Khashaev <arthur@khashaev.ru> | 2019-05-27 22:34:52 +0300 |
| commit | 89f464f04fc0dd23ce112b647f84b5e83ee5166f (patch) | |
| tree | fbf4e548da8a40a4a13a60b128e530ab72c9084f /src | |
| parent | fcae7368286fa50b7cacfb34509eff07362e4df9 (diff) | |
| download | rabbitmq-server-git-89f464f04fc0dd23ce112b647f84b5e83ee5166f.tar.gz | |
Add an option to dead-letter rejected publishes
Add `reject-publish-dlx` overflow strategy, which is similar to
`reject-publish` strategy, but also dead-letters rejected messages.
Closes #1443
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 19 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 2 |
3 files changed, 22 insertions, 3 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e94dd8f27..85c647ae8c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_overflow({longstr, Val}, _Args) -> - case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of true -> ok; false -> {error, invalid_overflow} end; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3f89b7ef0..2666847e77 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -82,7 +82,7 @@ %% max length in bytes, if configured max_bytes, %% an action to perform if queue is to be over a limit, - %% can be either drop-head (default) or reject-publish + %% can be either drop-head (default), reject-publish or reject-publish-dlx overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, @@ -704,12 +704,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, Delivered, State = #q{overflow = Overflow, backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + dlx = DLX, + dlx_routing_key = RK}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); + {true, 'reject-publish-dlx'} -> + %% Publish to DLX + with_dlx( + DLX, + fun (X) -> + QName = qname(State), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> ok end), + %% Drop publish and nack to publisher + send_reject_publish(Delivery, Delivered, State); _ -> {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State1 = State#q{backing_queue_state = BQS1}, @@ -766,6 +779,8 @@ maybe_drop_head(State = #q{max_length = undefined, {false, State}; maybe_drop_head(State = #q{overflow = 'reject-publish'}) -> {false, State}; +maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) -> + {false, State}; maybe_drop_head(State = #q{overflow = 'drop-head'}) -> maybe_drop_head(false, State). diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 7878bed02d..c4f4226448 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> ok; validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; +validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) -> + ok; validate_policy0(<<"overflow">>, Value) -> {error, "~p is not a valid overflow value", [Value]}; |
