diff options
| -rw-r--r-- | src/rabbit_basic.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 36 |
2 files changed, 43 insertions, 2 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 1cb6bef4ab..ee1e5290be 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,8 +20,8 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1, - parse_expiration/1, header/2, header/3]). + extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, + header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). %%---------------------------------------------------------------------------- @@ -249,6 +249,11 @@ extract_headers(Content) -> rabbit_binary_parser:ensure_content_decoded(Content), Headers. +extract_timestamp(Content) -> + #content{properties = #'P_basic'{timestamp = Timestamp}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Timestamp. + map_headers(F, Content) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content), #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d4e0a04f39..6d0439a55c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -266,6 +266,7 @@ q3, q4, next_seq_id, + head_msg_timestamp, %% copy of ts prop. of first msg, if present ram_pending_ack, %% msgs using store, still in RAM disk_pending_ack, %% msgs in store, paged out qi_pending_ack, %% msgs using qi, *can't* be paged out @@ -883,6 +884,8 @@ info(disk_writes, #vqstate{disk_write_count = Count}) -> Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + ram_pending_ack = RPA, + disk_pending_ack = DPA, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -899,6 +902,7 @@ info(backing_queue_status, #vqstate { {len , Len}, {target_ram_count , TargetRamCount}, {next_seq_id , NextSeqId}, + {head_msg_timestamp , head_msg_timestamp(Q3, Q4, RPA, DPA)}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, @@ -911,6 +915,38 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. +%% Get the AMQP Timestamp property of the first message, if present. +%% The first message is the first one found on the heads of the pending acks and +%% unread message queues, checked in turn. +%% Unacked messages are included as they are regarded as unprocessed until acked, +%% also to avoid the timestamp oscillating during repeated rejects. +head_msg_timestamp(Q3, Q4, RPA, DPA) -> + case gb_trees:is_empty(RPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA); + true -> + case gb_trees:is_empty(DPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA); + true -> + case ?QUEUE:is_empty(Q4) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q4); + true -> + case ?QUEUE:is_empty(Q3) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q3); + true -> MsgStatus = undefined + end + end + end + end, + case MsgStatus of + undefined -> undefined; + _ -> + Msg = MsgStatus#msg_status.msg, + case Msg of + undefined -> undefined; + _ -> rabbit_basic:extract_timestamp(Msg#basic_message.content) + end + end. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
