summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2015-04-26 13:40:28 +0300
committerMichael Klishin <mklishin@pivotal.io>2015-04-26 13:40:28 +0300
commita04d9f1dd55d8dde84fa2274ef5ae8b15f05f81d (patch)
treef1d6ac964688de8c74b2a9db798f2c6f6e33208d /src
parentc169604de6f02f9703bffc8b1637f02ad22d9151 (diff)
parentee0945aae3b7cffcfbf6f993d488cde08be17e7b (diff)
downloadrabbitmq-server-git-a04d9f1dd55d8dde84fa2274ef5ae8b15f05f81d.tar.gz
Merge branch 'sla_tracking_v2' of git://github.com/alexethomas/rabbitmq-server into rabbitmq-server-54
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_basic.erl9
-rw-r--r--src/rabbit_variable_queue.erl44
3 files changed, 52 insertions, 3 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 55c8c971a0..b567906378 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -22,7 +22,7 @@
messages_unacknowledged_ram, messages_persistent,
message_bytes, message_bytes_ready,
message_bytes_unacknowledged, message_bytes_ram,
- message_bytes_persistent,
+ message_bytes_persistent, head_message_timestamp,
disk_reads, disk_writes, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 1cb6bef4ab..ee1e5290be 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -20,8 +20,8 @@
-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, prepend_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1,
- parse_expiration/1, header/2, header/3]).
+ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4,
+ header_routes/1, parse_expiration/1, header/2, header/3]).
-export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]).
%%----------------------------------------------------------------------------
@@ -249,6 +249,11 @@ extract_headers(Content) ->
rabbit_binary_parser:ensure_content_decoded(Content),
Headers.
+extract_timestamp(Content) ->
+ #content{properties = #'P_basic'{timestamp = Timestamp}} =
+ rabbit_binary_parser:ensure_content_decoded(Content),
+ Timestamp.
+
map_headers(F, Content) ->
Content1 = rabbit_binary_parser:ensure_content_decoded(Content),
#content{properties = #'P_basic'{headers = Headers} = Props} = Content1,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c684d9ffe9..c2c6bdc4aa 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -876,6 +876,12 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
+info(head_message_timestamp, #vqstate{
+ q3 = Q3,
+ q4 = Q4,
+ ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ head_message_timestamp(Q3, Q4, RPA, QPA);
info(disk_reads, #vqstate{disk_read_count = Count}) ->
Count;
info(disk_writes, #vqstate{disk_write_count = Count}) ->
@@ -910,6 +916,44 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
+%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp
+%% among the heads of the pending acks and unread queues.
+%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in
+%% rather than forcing it to happen.
+%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents
+%% the result apparently oscillating during repeated rejects.
+%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier.
+head_message_timestamp(Q3, Q4, RPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
+ HeadMsgStatus <-
+ [ case ?QUEUE:is_empty(Q4) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus;
+ true -> case ?QUEUE:is_empty(Q3) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus;
+ true -> undefined
+ end
+ end,
+ case gb_trees:is_empty(RPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus;
+ true -> undefined
+ end,
+ case gb_trees:is_empty(QPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus;
+ true -> undefined
+ end],
+ HeadMsgStatus /= undefined ],
+ Timestamps =
+ [ Timestamp ||
+ Timestamp <-
+ [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <-
+ HeadMsgs ],
+ Timestamp /= undefined ],
+ case Timestamps == [] of
+ true -> '';
+ false -> lists:min(Timestamps)
+ end.
+
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------