diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-02-18 22:21:23 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-22 14:14:31 +0000 |
| commit | 8a3d3e654b66a3fa53b45bd949b92accf19974da (patch) | |
| tree | 79bf17f6507cba06898daaea3b718984f25012c8 | |
| parent | d256c78560a9d73fb39d7c159eedc00768a35dc2 (diff) | |
| download | rabbitmq-server-git-8a3d3e654b66a3fa53b45bd949b92accf19974da.tar.gz | |
Poison handling in quorum queues
Drop messages that exceed the configured number of delivery attemps.
If a DLX is configured, dead letter them.
[#163513253]
| -rw-r--r-- | src/rabbit_fifo.erl | 67 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 5 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 153 |
4 files changed, 208 insertions, 33 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index af3441df86..da9772516c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -252,7 +252,8 @@ consumer_strategy = default :: default | single_active, %% waiting consumers, one is picked active consumer is cancelled or dies %% used only when single active consumer is on - waiting_consumers = [] :: [{consumer_id(), consumer()}] + waiting_consumers = [] :: [{consumer_id(), consumer()}], + delivery_limit :: maybe(non_neg_integer()) }). -opaque state() :: #state{}. @@ -264,7 +265,8 @@ release_cursor_interval => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), - single_active_consumer_on => boolean()}. + single_active_consumer_on => boolean(), + delivery_limit => non_neg_integer()}. -export_type([protocol/0, delivery/0, @@ -293,6 +295,7 @@ update_config(Conf, State) -> SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), + DeliveryLimit = maps:get(delivery_limit, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of true -> single_active; @@ -304,7 +307,8 @@ update_config(Conf, State) -> release_cursor_interval = SHI, max_length = MaxLength, max_bytes = MaxBytes, - consumer_strategy = ConsumerStrategy}. + consumer_strategy = ConsumerStrategy, + delivery_limit = DeliveryLimit}. zero(_) -> 0. @@ -468,10 +472,10 @@ apply(_, {down, ConsumerPid, noconnection}, #consumer{checked_out = Checked0} = C, {Co, St0, Eff}) when (node(P) =:= Node) and (C#consumer.status =/= cancelled)-> - St = return_all(St0, Checked0), + {St, Eff0} = return_all(St0, Checked0, Eff), Credit = increase_credit(C, maps:size(Checked0)), Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff), + suspected_down, Eff0), {maps:put(K, C#consumer{status = suspected_down, credit = Credit, @@ -961,8 +965,8 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1 C0, SQ0, Effects0), {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; down -> - S1 = return_all(S0, Checked0), - {S1#state{consumers = Cons1}, Effects0} + {S1, Effects1} = return_all(S0, Checked0, Effects0), + {S1#state{consumers = Cons1}, Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> @@ -1082,16 +1086,16 @@ return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> Con = Con0#consumer{checked_out = Checked, credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, + {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun({'$prefix_msg', _} = Msg, S0) -> - return_one(0, Msg, S0); - ({MsgNum, Msg}, S0) -> - return_one(MsgNum, Msg, S0) - end, State0, MsgNumMsgs), + {State1, Effects2} = lists:foldl(fun({'$prefix_msg', _} = Msg, {S0, E0}) -> + return_one(0, Msg, S0, E0); + ({MsgNum, Msg}, {S0, E0}) -> + return_one(MsgNum, Msg, S0, E0) + end, {State0, Effects1}, MsgNumMsgs), checkout(Meta, State1#state{consumers = Cons, service_queue = SQ}, - Effects). + Effects2). % used to processes messages that are finished complete(ConsumerId, MsgRaftIdxs, NumDiscarded, @@ -1197,28 +1201,35 @@ find_next_cursor(Smallest, Cursors0, Potential) -> end. return_one(0, {'$prefix_msg', _} = Msg, - #state{returns = Returns} = State0) -> - add_bytes_return(Msg, - State0#state{returns = lqueue:in(Msg, Returns)}); + #state{returns = Returns} = State0, Effects) -> + {add_bytes_return(Msg, + State0#state{returns = lqueue:in(Msg, Returns)}), Effects}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, - #state{returns = Returns} = State0) -> + #state{returns = Returns, + delivery_limit = DeliveryLimit} = State0, Effects0) -> Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), - Msg = {RaftId, {Header, RawMsg}}, - % this should not affect the release cursor in any way - add_bytes_return(RawMsg, - State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}). + case maps:get(delivery_count, Header) of + DeliveryCount when DeliveryCount > DeliveryLimit -> + Effects = dead_letter_effects(rejected, maps:put(none, {MsgNum, {RaftId, {Header, RawMsg}}}, #{}), State0, Effects0), + {add_bytes_settle(RawMsg, State0), Effects}; + _ -> + Msg = {RaftId, {Header, RawMsg}}, + %% this should not affect the release cursor in any way + {add_bytes_return(RawMsg, + State0#state{returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} + end. -return_all(State0, Checked0) -> +return_all(State0, Checked0, Effects0) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, S) -> - return_one(0, Msg, S); - ({_, {MsgNum, Msg}}, S) -> - return_one(MsgNum, Msg, S) - end, State0, Checked). + lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(0, Msg, S, E); + ({_, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgNum, Msg, S, E) + end, {State0, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 555e4e2e87..95bf067539 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -43,14 +43,17 @@ register() -> {policy_validator, <<"max-length-bytes">>}, {policy_validator, <<"queue-mode">>}, {policy_validator, <<"overflow">>}, + {policy_validator, <<"delivery-limit">>}, {operator_policy_validator, <<"expires">>}, {operator_policy_validator, <<"message-ttl">>}, {operator_policy_validator, <<"max-length">>}, {operator_policy_validator, <<"max-length-bytes">>}, + {operator_policy_validator, <<"delivery-limit">>}, {policy_merge_strategy, <<"expires">>}, {policy_merge_strategy, <<"message-ttl">>}, {policy_merge_strategy, <<"max-length">>}, - {policy_merge_strategy, <<"max-length-bytes">>}]], + {policy_merge_strategy, <<"max-length-bytes">>}, + {policy_merge_strategy, <<"delivery-limit">>}]], ok. validate_policy(Terms) -> @@ -111,9 +114,16 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; validate_policy0(<<"overflow">>, Value) -> - {error, "~p is not a valid overflow value", [Value]}. + {error, "~p is not a valid overflow value", [Value]}; + +validate_policy0(<<"delivery-limit">>, Value) + when is_integer(Value), Value >= 0 -> + ok; +validate_policy0(<<"delivery-limit">>, Value) -> + {error, "~p is not a valid delivery limit", [Value]}. merge_policy_value(<<"message-ttl">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length">>, Val, OpVal) -> min(Val, OpVal); merge_policy_value(<<"max-length-bytes">>, Val, OpVal) -> min(Val, OpVal); -merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal). +merge_policy_value(<<"expires">>, Val, OpVal) -> min(Val, OpVal); +merge_policy_value(<<"delivery-limit">>, Val, OpVal) -> min(Val, OpVal). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 9e73981541..7c46170b8e 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -163,13 +163,16 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> %% take the minimum value of the policy and the queue arg if present MaxLength = args_policy_lookup(<<"max-length">>, fun min/2, Q), MaxBytes = args_policy_lookup(<<"max-length-bytes">>, fun min/2, Q), + DeliveryLimit = args_policy_lookup(<<"delivery-limit">>, fun min/2, Q), #{name => Name, queue_resource => QName, dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, max_length => MaxLength, max_bytes => MaxBytes, - single_active_consumer_on => single_active_consumer_on(Q)}. + single_active_consumer_on => single_active_consumer_on(Q), + delivery_limit => DeliveryLimit + }. single_active_consumer_on(Q) -> QArguments = amqqueue:get_arguments(Q), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 406c02de83..bd54d59869 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -111,7 +111,10 @@ all_tests() -> consume_redelivery_count, subscribe_redelivery_count, message_bytes_metrics, - queue_length_limit_drop_head + queue_length_limit_drop_head, + subscribe_redelivery_limit, + subscribe_redelivery_policy, + subscribe_redelivery_limit_with_dead_letter ]. memory_tests() -> @@ -1519,6 +1522,154 @@ subscribe_redelivery_count(Config) -> wait_for_messages_pending_ack(Servers, RaName, 0) end. +subscribe_redelivery_limit(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end. + +subscribe_redelivery_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"delivery-limit">>, <<".*">>, <<"queues">>, + [{<<"delivery-limit">>, 1}]), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + receive + {#'basic.deliver'{redelivered = true}, #amqp_msg{}} -> + throw(unexpected_redelivery) + after 2000 -> + ok + end, + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"delivery-limit">>). + +subscribe_redelivery_limit_with_dead_letter(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + DLX = <<"subcribe_redelivery_limit_with_dead_letter_dlx">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}, + {<<"x-delivery-limit">>, long, 1}, + {<<"x-dead-letter-exchange">>, longstr, <<>>}, + {<<"x-dead-letter-routing-key">>, longstr, DLX} + ])), + ?assertEqual({'queue.declare_ok', DLX, 0, 0}, + declare(Ch, DLX, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + RaDlxName = ra_name(DLX), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch(undefined, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0), + wait_for_messages_ready(Servers, RaDlxName, 1), + wait_for_messages_pending_ack(Servers, RaDlxName, 0). + consume_redelivery_count(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), |
