summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl29
1 files changed, 23 insertions, 6 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index f5a3a5f1a9..e3ccf2233a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -540,7 +540,7 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
{false, BQS1} ->
deliver_msgs_to_consumers(
fun (AckRequired, State1 = #q{backing_queue_state = BQS2}) ->
- Props = message_properties(Confirm, State1),
+ Props = message_properties(Message, Confirm, State1),
{AckTag, BQS3} = BQ:publish_delivered(
AckRequired, Message, Props,
SenderPid, BQS2),
@@ -575,7 +575,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
- Props = message_properties(Confirm, State2),
+ Props = message_properties(Message, Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
@@ -705,12 +705,29 @@ discard_delivery(#delivery{sender = SenderPid,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:discard(Message, SenderPid, BQS)}.
-message_properties(Confirm, #q{ttl = TTL}) ->
- #message_properties{expiry = calculate_msg_expiry(TTL),
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(Message, TTL),
needs_confirming = needs_confirming(Confirm)}.
-calculate_msg_expiry(undefined) -> undefined;
-calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
+calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
+ #content{properties = #'P_basic'{expiration = Expiration}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ ParseError =
+ fun () -> rabbit_log:warning("could not parse expiration '~s'~n.",
+ [Expiration])
+ end,
+ Milli = case Expiration of
+ undefined -> TTL;
+ B -> case string:to_integer(binary_to_list(B)) of
+ {error, no_integer} -> ParseError(), TTL;
+ {N, ""} -> N;
+ {_, _ } -> ParseError(), TTL
+ end
+ end,
+ case Milli of
+ undefined -> undefined;
+ _ -> now_micros() + Milli * 1000
+ end.
drop_expired_messages(State = #q{ttl = undefined}) ->
State;