diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 29 |
1 files changed, 23 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f5a3a5f1a9..e3ccf2233a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -540,7 +540,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, {false, BQS1} -> deliver_msgs_to_consumers( fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) -> - Props = message_properties(Confirm, State1), + Props = message_properties(Message, Confirm, State1), {AckTag, BQS3} = BQ:publish_delivered( AckRequired, Message, Props, SenderPid, BQS2), @@ -575,7 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), - Props = message_properties(Confirm, State2), + Props = message_properties(Message, Confirm, State2), BQS1 = BQ:publish(Message, Props, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) @@ -705,12 +705,29 @@ discard_delivery(#delivery{sender = SenderPid, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}. -message_properties(Confirm, #q{ttl = TTL}) -> - #message_properties{expiry = calculate_msg_expiry(TTL), +message_properties(Message, Confirm, #q{ttl = TTL}) -> + #message_properties{expiry = calculate_msg_expiry(Message, TTL), needs_confirming = needs_confirming(Confirm)}. -calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). +calculate_msg_expiry(#basic_message{content = Content}, TTL) -> + #content{properties = #'P_basic'{expiration = Expiration}} = + rabbit_binary_parser:ensure_content_decoded(Content), + ParseError = + fun () -> rabbit_log:warning("could not parse expiration '~s'~n.", + [Expiration]) + end, + Milli = case Expiration of + undefined -> TTL; + B -> case string:to_integer(binary_to_list(B)) of + {error, no_integer} -> ParseError(), TTL; + {N, ""} -> N; + {_, _ } -> ParseError(), TTL + end + end, + case Milli of + undefined -> undefined; + _ -> now_micros() + Milli * 1000 + end. drop_expired_messages(State = #q{ttl = undefined}) -> State; |
