summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl67
-rw-r--r--src/rabbit_policies.erl16
-rw-r--r--src/rabbit_quorum_queue.erl5
-rw-r--r--test/quorum_queue_SUITE.erl153
4 files changed, 208 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index af3441df86..da9772516c 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -252,7 +252,8 @@
consumer_strategy = default :: default | single_active,
%% waiting consumers, one is picked active consumer is cancelled or dies
%% used only when single active consumer is on
- waiting_consumers = [] :: [{consumer_id(), consumer()}]
+ waiting_consumers = [] :: [{consumer_id(), consumer()}],
+ delivery_limit :: maybe(non_neg_integer())
}).
-opaque state() :: #state{}.
@@ -264,7 +265,8 @@
release_cursor_interval => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
- single_active_consumer_on => boolean()}.
+ single_active_consumer_on => boolean(),
+ delivery_limit => non_neg_integer()}.
-export_type([protocol/0,
delivery/0,
@@ -293,6 +295,7 @@ update_config(Conf, State) ->
SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
+ DeliveryLimit = maps:get(delivery_limit, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
true ->
single_active;
@@ -304,7 +307,8 @@ update_config(Conf, State) ->
release_cursor_interval = SHI,
max_length = MaxLength,
max_bytes = MaxBytes,
- consumer_strategy = ConsumerStrategy}.
+ consumer_strategy = ConsumerStrategy,
+ delivery_limit = DeliveryLimit}.
zero(_) ->
0.
@@ -468,10 +472,10 @@ apply(_, {down, ConsumerPid, noconnection},
#consumer{checked_out = Checked0} = C,
{Co, St0, Eff}) when (node(P) =:= Node) and
(C#consumer.status =/= cancelled)->
- St = return_all(St0, Checked0),
+ {St, Eff0} = return_all(St0, Checked0, Eff),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
- suspected_down, Eff),
+ suspected_down, Eff0),
{maps:put(K,
C#consumer{status = suspected_down,
credit = Credit,
@@ -961,8 +965,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1
C0, SQ0, Effects0),
{S0#state{consumers = Cons, service_queue = SQ}, Effects1};
down ->
- S1 = return_all(S0, Checked0),
- {S1#state{consumers = Cons1}, Effects0}
+ {S1, Effects1} = return_all(S0, Checked0, Effects0),
+ {S1#state{consumers = Cons1}, Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -1082,16 +1086,16 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) ->
Con = Con0#consumer{checked_out = Checked,
credit = increase_credit(Con0, length(MsgNumMsgs))},
- {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) ->
- return_one(0, Msg, S0);
- ({MsgNum, Msg}, S0) ->
- return_one(MsgNum, Msg, S0)
- end, State0, MsgNumMsgs),
+ {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(0, Msg, S0, E0);
+ ({MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgNum, Msg, S0, E0)
+ end, {State0, Effects1}, MsgNumMsgs),
checkout(Meta, State1#state{consumers = Cons,
service_queue = SQ},
- Effects).
+ Effects2).
% used to processes messages that are finished
complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
@@ -1197,28 +1201,35 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
end.
return_one(0, {'$prefix_msg', _} = Msg,
- #state{returns = Returns} = State0) ->
- add_bytes_return(Msg,
- State0#state{returns = lqueue:in(Msg, Returns)});
+ #state{returns = Returns} = State0, Effects) ->
+ {add_bytes_return(Msg,
+ State0#state{returns = lqueue:in(Msg, Returns)}), Effects};
return_one(MsgNum, {RaftId, {Header0, RawMsg}},
- #state{returns = Returns} = State0) ->
+ #state{returns = Returns,
+ delivery_limit = DeliveryLimit} = State0, Effects0) ->
Header = maps:update_with(delivery_count,
fun (C) -> C+1 end,
1, Header0),
- Msg = {RaftId, {Header, RawMsg}},
- % this should not affect the release cursor in any way
- add_bytes_return(RawMsg,
- State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}).
+ case maps:get(delivery_count, Header) of
+ DeliveryCount when DeliveryCount > DeliveryLimit ->
+ Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0),
+ {add_bytes_settle(RawMsg, State0), Effects};
+ _ ->
+ Msg = {RaftId, {Header, RawMsg}},
+ %% this should not affect the release cursor in any way
+ {add_bytes_return(RawMsg,
+ State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0}
+ end.
-return_all(State0, Checked0) ->
+return_all(State0, Checked0, Effects0) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) ->
- return_one(0, Msg, S);
- ({_, {MsgNum, Msg}}, S) ->
- return_one(MsgNum, Msg, S)
- end, State0, Checked).
+ lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(0, Msg, S, E);
+ ({_, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgNum, Msg, S, E)
+ end, {State0, Effects0}, Checked).
%% checkout new messages to consumers
%% reverses the effects list
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 555e4e2e87..95bf067539 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -43,14 +43,17 @@ register() ->
{policy_validator, <<"max-length-bytes">>},
{policy_validator, <<"queue-mode">>},
{policy_validator, <<"overflow">>},
+ {policy_validator, <<"delivery-limit">>},
{operator_policy_validator, <<"expires">>},
{operator_policy_validator, <<"message-ttl">>},
{operator_policy_validator, <<"max-length">>},
{operator_policy_validator, <<"max-length-bytes">>},
+ {operator_policy_validator, <<"delivery-limit">>},
{policy_merge_strategy, <<"expires">>},
{policy_merge_strategy, <<"message-ttl">>},
{policy_merge_strategy, <<"max-length">>},
- {policy_merge_strategy, <<"max-length-bytes">>}]],
+ {policy_merge_strategy, <<"max-length-bytes">>},
+ {policy_merge_strategy, <<"delivery-limit">>}]],
ok.
validate_policy(Terms) ->
@@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) ->
validate_policy0(<<"overflow">>, <<"reject-publish">>) ->
ok;
validate_policy0(<<"overflow">>, Value) ->
- {error, "~p is not a valid overflow value", [Value]}.
+ {error, "~p is not a valid overflow value", [Value]};
+
+validate_policy0(<<"delivery-limit">>, Value)
+ when is_integer(Value), Value >= 0 ->
+ ok;
+validate_policy0(<<"delivery-limit">>, Value) ->
+ {error, "~p is not a valid delivery limit", [Value]}.
merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal);
merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal);
-merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal).
+merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal);
+merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 9e73981541..7c46170b8e 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
%% take the minimum value of the policy and the queue arg if present
MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q),
MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q),
+ DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q),
#{name => Name,
queue_resource => QName,
dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]},
max_length => MaxLength,
max_bytes => MaxBytes,
- single_active_consumer_on => single_active_consumer_on(Q)}.
+ single_active_consumer_on => single_active_consumer_on(Q),
+ delivery_limit => DeliveryLimit
+ }.
single_active_consumer_on(Q) ->
QArguments = amqqueue:get_arguments(Q),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 406c02de83..bd54d59869 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -111,7 +111,10 @@ all_tests() ->
consume_redelivery_count,
subscribe_redelivery_count,
message_bytes_metrics,
- queue_length_limit_drop_head
+ queue_length_limit_drop_head,
+ subscribe_redelivery_limit,
+ subscribe_redelivery_policy,
+ subscribe_redelivery_limit_with_dead_letter
].
memory_tests() ->
@@ -1519,6 +1522,154 @@ subscribe_redelivery_count(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 0)
end.
+subscribe_redelivery_limit(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1}])),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end.
+
+subscribe_redelivery_policy(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ ok = rabbit_ct_broker_helpers:set_policy(
+ Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>,
+ [{<<"delivery-limit">>, 1}]),
+
+ RaName = ra_name(QQ),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ receive
+ {#'basic.deliver'{redelivered = true}, #amqp_msg{}} ->
+ throw(unexpected_redelivery)
+ after 2000 ->
+ ok
+ end,
+ ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>).
+
+subscribe_redelivery_limit_with_dead_letter(Config) ->
+ [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>,
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
+ {<<"x-delivery-limit">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, DLX}
+ ])),
+ ?assertEqual({'queue.declare_ok', DLX, 0, 0},
+ declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+
+ RaName = ra_name(QQ),
+ RaDlxName = ra_name(DLX),
+ publish(Ch, QQ),
+ wait_for_messages_ready(Servers, RaName, 1),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ subscribe(Ch, QQ, false),
+
+ DTag = <<"x-delivery-count">>,
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag,
+ redelivered = false},
+ #amqp_msg{props = #'P_basic'{headers = H0}}} ->
+ ?assertMatch(undefined, rabbit_basic:header(DTag, H0)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 1),
+ receive
+ {#'basic.deliver'{delivery_tag = DeliveryTag1,
+ redelivered = true},
+ #amqp_msg{props = #'P_basic'{headers = H1}}} ->
+ ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1,
+ multiple = false,
+ requeue = true})
+ end,
+
+ wait_for_messages_ready(Servers, RaName, 0),
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ wait_for_messages_ready(Servers, RaDlxName, 1),
+ wait_for_messages_pending_ack(Servers, RaDlxName, 0).
+
consume_redelivery_count(Config) ->
[Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),