summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl67
-rw-r--r--src/rabbit_policies.erl16
-rw-r--r--src/rabbit_quorum_queue.erl5
3 files changed, 56 insertions, 32 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index af3441df86..da9772516c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -252,7 +252,8 @@
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ delivery_limit :: maybe(non_neg_integer())
}).
-opaque state() :: #state{}.
@@ -264,7 +265,8 @@
release_cursor_interval => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
- single_active_consumer_on => boolean()}.
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.
-export_type([protocol/0,
delivery/0,
@@ -293,6 +295,7 @@ update_config(Conf, State) ->
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -304,7 +307,8 @@ update_config(Conf, State) ->
release_cursor_interval = SHI,
max_length = MaxLength,
max_bytes = MaxBytes,
- consumer_strategy = ConsumerStrategy}.
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}.
zero(_) ->
0.
@@ -468,10 +472,10 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when (node(P) =:= Node) and
(C#consumer.status =/= cancelled)->
- St = return_all(St0, Checked0),
+ {St, Eff0} = return_all(St0, Checked0, Eff),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
- suspected_down, Eff),
+ suspected_down, Eff0),
{maps:put(K,
C#consumer{status = suspected_down,
credit = Credit,
@@ -961,8 +965,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
C0, SQ0, Effects0),
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- S1 = return_all(S0, Checked0),
- {S1#state{consumers = Cons1}, Effects0}
+ {S1, Effects1} = return_all(S0, Checked0, Effects0),
+ {S1#state{consumers = Cons1}, Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -1082,16 +1086,16 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
- return_one(0, Msg, S0);
- ({MsgNum, Msg}, S0) ->
- return_one(MsgNum, Msg, S0)
- end, State0, MsgNumMsgs),
+ {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0);
+ ({MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgNum, Msg, S0, E0)
+ end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
- Effects).
+ Effects2).
% used to processes messages that are finished
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
@@ -1197,28 +1201,35 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
end.
return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0) ->
- add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)});
+ #state{returns = Returns} = State0, Effects) ->
+ {add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{returns = Returns} = State0) ->
+ #state{returns = Returns,
+ delivery_limit = DeliveryLimit} = State0, Effects0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
- Msg = {RaftId, {Header, RawMsg}},
- % this should not affect the release cursor in any way
- add_bytes_return(RawMsg,
- State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
+ {add_bytes_settle(RawMsg, State0), Effects};
+ _ ->
+ Msg = {RaftId, {Header, RawMsg}},
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(RawMsg,
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
+ end.
-return_all(State0, Checked0) ->
+return_all(State0, Checked0, Effects0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
- return_one(0, Msg, S);
- ({_, {MsgNum, Msg}}, S) ->
- return_one(MsgNum, Msg, S)
- end, State0, Checked).
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(0, Msg, S, E);
+ ({_, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgNum, Msg, S, E)
+ end, {State0, Effects0}, Checked).
%% checkout new messages to consumers
%% reverses the effects list
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 555e4e2e87..95bf067539 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -43,14 +43,17 @@ register() ->
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
+ {policy_validator, <<"delivery-limit">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
{operator_policy_validator, <<"max-length-bytes">>},
+ {operator_policy_validator, <<"delivery-limit">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
- {policy_merge_strategy, <<"max-length-bytes">>}]],
+ {policy_merge_strategy, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"delivery-limit">>}]],
ok.
validate_policy(Terms) ->
@@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
validate_policy0(<<"overflow">>, Value) ->
- {error, "~p is not a valid overflow value", [Value]}.
+ {error, "~p is not a valid overflow value", [Value]};
+
+validate_policy0(<<"delivery-limit">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"delivery-limit">>, Value) ->
+ {error, "~p is not a valid delivery limit", [Value]}.
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
-merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
+merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 9e73981541..7c46170b8e 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
- single_active_consumer_on => single_active_consumer_on(Q)}.
+ single_active_consumer_on => single_active_consumer_on(Q),
+ delivery_limit => DeliveryLimit
+ }.
single_active_consumer_on(Q) ->
QArguments = amqqueue:get_arguments(Q),