summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlex Thomas <alext@misshelpful>2015-02-24 14:33:48 +0000
committerAlex Thomas <alext@lshift.net>2015-04-10 17:01:10 +0100
commit2cc6030e2066013727ae47db6307046a7bd39c54 (patch)
tree5c8cb1efe39c1a10fb637f606167ce7fea0c9328 /src
parent588b64e2262b3566cde101474d72a91f1cb80305 (diff)
downloadrabbitmq-server-git-2cc6030e2066013727ae47db6307046a7bd39c54.tar.gz
Track and expose the AMQP Timestamp property of the first (usually oldest) message in a queue.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_basic.erl9
-rw-r--r--src/rabbit_variable_queue.erl36
2 files changed, 43 insertions, 2 deletions
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 1cb6bef4ab..ee1e5290be 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,8 +20,8 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1,
- parse_expiration/1, header/2, header/3]).
+ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
+ header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
%%----------------------------------------------------------------------------
@@ -249,6 +249,11 @@ extract_headers(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
+extract_timestamp(Content) ->
+ #content{properties = #'P_basic'{timestamp = Timestamp}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Timestamp.
+
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d4e0a04f39..6d0439a55c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -266,6 +266,7 @@
q3,
q4,
next_seq_id,
+ head_msg_timestamp, %% copy of ts prop. of first msg, if present
ram_pending_ack, %% msgs using store, still in RAM
disk_pending_ack, %% msgs in store, paged out
qi_pending_ack, %% msgs using qi, *can't* be paged out
@@ -883,6 +884,8 @@ info(disk_writes, #vqstate{disk_write_count = Count}) ->
Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
len = Len,
target_ram_count = TargetRamCount,
next_seq_id = NextSeqId,
@@ -899,6 +902,7 @@ info(backing_queue_status, #vqstate {
{len , Len},
{target_ram_count , TargetRamCount},
{next_seq_id , NextSeqId},
+ {head_msg_timestamp , head_msg_timestamp(Q3, Q4, RPA, DPA)},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
@@ -911,6 +915,38 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
+%% Get the AMQP Timestamp property of the first message, if present.
+%% The first message is the first one found on the heads of the pending acks and
+%% unread message queues, checked in turn.
+%% Unacked messages are included as they are regarded as unprocessed until acked,
+%% also to avoid the timestamp oscillating during repeated rejects.
+head_msg_timestamp(Q3, Q4, RPA, DPA) ->
+ case gb_trees:is_empty(RPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA);
+ true ->
+ case gb_trees:is_empty(DPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA);
+ true ->
+ case ?QUEUE:is_empty(Q4) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q4);
+ true ->
+ case ?QUEUE:is_empty(Q3) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q3);
+ true -> MsgStatus = undefined
+ end
+ end
+ end
+ end,
+ case MsgStatus of
+ undefined -> undefined;
+ _ ->
+ Msg = MsgStatus#msg_status.msg,
+ case Msg of
+ undefined -> undefined;
+ _ -> rabbit_basic:extract_timestamp(Msg#basic_message.content)
+ end
+ end.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------