diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-02-18 22:21:23 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-22 14:14:31 +0000 |
| commit | 8a3d3e654b66a3fa53b45bd949b92accf19974da (patch) | |
| tree | 79bf17f6507cba06898daaea3b718984f25012c8 /src | |
| parent | d256c78560a9d73fb39d7c159eedc00768a35dc2 (diff) | |
| download | rabbitmq-server-git-8a3d3e654b66a3fa53b45bd949b92accf19974da.tar.gz | |
Poison handling in quorum queues
Drop messages that exceed the configured number of delivery attemps.
If a DLX is configured, dead letter them.
[#163513253]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 5 |
3 files changed, 56 insertions, 32 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index af3441df86..da9772516c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -252,7 +252,8 @@ consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}] + waiting_consumers = [] :: [{consumer_id(), consumer()}], + delivery_limit :: maybe(non_neg_integer()) }). -opaque state() :: #state{}. @@ -264,7 +265,8 @@ release_cursor_interval => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), - single_active_consumer_on => boolean()}. + single_active_consumer_on => boolean(), + delivery_limit => non_neg_integer()}. -export_type([protocol/0, delivery/0, @@ -293,6 +295,7 @@ update_config(Conf, State) -> SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), + DeliveryLimit = maps:get(delivery_limit, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; @@ -304,7 +307,8 @@ update_config(Conf, State) -> release_cursor_interval = SHI, max_length = MaxLength, max_bytes = MaxBytes, - consumer_strategy = ConsumerStrategy}. + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}. zero(_) -> 0. @@ -468,10 +472,10 @@ apply(_, {down, ConsumerPid, noconnection}, #consumer{checked_out = Checked0} = C, {Co, St0, Eff}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled)-> - St = return_all(St0, Checked0), + {St, Eff0} = return_all(St0, Checked0, Eff), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff), + suspected_down, Eff0), {maps:put(K, C#consumer{status = suspected_down, credit = Credit, @@ -961,8 +965,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 C0, SQ0, Effects0), {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; down -> - S1 = return_all(S0, Checked0), - {S1#state{consumers = Cons1}, Effects0} + {S1, Effects1} = return_all(S0, Checked0, Effects0), + {S1#state{consumers = Cons1}, Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> @@ -1082,16 +1086,16 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> Con = Con0#consumer{checked_out = Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, + {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) -> - return_one(0, Msg, S0); - ({MsgNum, Msg}, S0) -> - return_one(MsgNum, Msg, S0) - end, State0, MsgNumMsgs), + {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0); + ({MsgNum, Msg}, {S0, E0}) -> + return_one(MsgNum, Msg, S0, E0) + end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, - Effects). + Effects2). % used to processes messages that are finished complete(ConsumerId, MsgRaftIdxs, NumDiscarded, @@ -1197,28 +1201,35 @@ find_next_cursor(Smallest, Cursors0, Potential) -> end. return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0) -> - add_bytes_return(Msg, - State0#state{returns = lqueue:in(Msg, Returns)}); + #state{returns = Returns} = State0, Effects) -> + {add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, - #state{returns = Returns} = State0) -> + #state{returns = Returns, + delivery_limit = DeliveryLimit} = State0, Effects0) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {RaftId, {Header, RawMsg}}, - % this should not affect the release cursor in any way - add_bytes_return(RawMsg, - State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}). + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), + {add_bytes_settle(RawMsg, State0), Effects}; + _ -> + Msg = {RaftId, {Header, RawMsg}}, + %% this should not affect the release cursor in any way + {add_bytes_return(RawMsg, + State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} + end. -return_all(State0, Checked0) -> +return_all(State0, Checked0, Effects0) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) -> - return_one(0, Msg, S); - ({_, {MsgNum, Msg}}, S) -> - return_one(MsgNum, Msg, S) - end, State0, Checked). + lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(0, Msg, S, E); + ({_, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgNum, Msg, S, E) + end, {State0, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 555e4e2e87..95bf067539 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -43,14 +43,17 @@ register() -> {policy_validator, <<"max-length-bytes">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, + {policy_validator, <<"delivery-limit">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, {operator_policy_validator, <<"max-length-bytes">>}, + {operator_policy_validator, <<"delivery-limit">>}, {policy_merge_strategy, <<"expires">>}, {policy_merge_strategy, <<"message-ttl">>}, {policy_merge_strategy, <<"max-length">>}, - {policy_merge_strategy, <<"max-length-bytes">>}]], + {policy_merge_strategy, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"delivery-limit">>}]], ok. validate_policy(Terms) -> @@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; validate_policy0(<<"overflow">>, Value) -> - {error, "~p is not a valid overflow value", [Value]}. + {error, "~p is not a valid overflow value", [Value]}; + +validate_policy0(<<"delivery-limit">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"delivery-limit">>, Value) -> + {error, "~p is not a valid delivery limit", [Value]}. merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); -merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal). +merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 9e73981541..7c46170b8e 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - single_active_consumer_on => single_active_consumer_on(Q)}. + single_active_consumer_on => single_active_consumer_on(Q), + delivery_limit => DeliveryLimit + }. single_active_consumer_on(Q) -> QArguments = amqqueue:get_arguments(Q), |
