diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 13 |
2 files changed, 26 insertions, 10 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9ecbcbc339..75091692d0 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args}, check_declare_arguments(QueueName, Args) -> Checks = [{<<"x-expires">>, fun check_positive_int_arg/2}, - {<<"x-message-ttl">>, fun check_positive_int_arg/2}, + {<<"x-message-ttl">>, fun check_non_neg_int_arg/2}, {<<"x-ha-policy">>, fun check_ha_policy_arg/2}, {<<"x-dead-letter-exchange">>, fun check_string_arg/2}, {<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}], @@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) -> check_string_arg({Type, _}, _) -> {error, {unacceptable_type, Type}}. -check_positive_int_arg({Type, Val}, _Args) -> +check_int_arg({Type, _}, _) -> case lists:member(Type, ?INTEGER_ARG_TYPES) of - false -> {error, {unacceptable_type, Type}}; - true when Val =< 0 -> {error, {value_zero_or_less, Val}}; - true -> ok + true -> ok; + false -> {error, {unacceptable_type, Type}} + end. + +check_positive_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val > 0 -> ok; + ok -> {error, {value_zero_or_less, Val}}; + Error -> Error + end. + +check_non_neg_int_arg({Type, Val}, Args) -> + case check_int_arg({Type, Val}, Args) of + ok when Val >= 0 -> ok; + ok -> {error, {value_less_than_zero, Val}}; + Error -> Error end. check_dlxrk_arg({longstr, _}, Args) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e1fd9bbca5..d643cecd58 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -552,11 +552,14 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {Delivered, Confirm, State1} = attempt_delivery(Delivery, State), State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + case {Delivered, State2#q.ttl} of + {true, _} -> State2; + {false, 0} -> State3 = discard_delivery(Delivery, State2), + %% TODO: handle confirms and dlx + State3; + {false, _} -> Props = message_properties(Confirm, State), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> |
