diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2015-04-26 13:40:28 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2015-04-26 13:40:28 +0300 |
| commit | a04d9f1dd55d8dde84fa2274ef5ae8b15f05f81d (patch) | |
| tree | f1d6ac964688de8c74b2a9db798f2c6f6e33208d /src | |
| parent | c169604de6f02f9703bffc8b1637f02ad22d9151 (diff) | |
| parent | ee0945aae3b7cffcfbf6f993d488cde08be17e7b (diff) | |
| download | rabbitmq-server-git-a04d9f1dd55d8dde84fa2274ef5ae8b15f05f81d.tar.gz | |
Merge branch 'sla_tracking_v2' of git://github.com/alexethomas/rabbitmq-server into rabbitmq-server-54
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_basic.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 44 |
3 files changed, 52 insertions, 3 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 55c8c971a0..b567906378 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,7 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, + message_bytes_persistent, head_message_timestamp, disk_reads, disk_writes, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 1cb6bef4ab..ee1e5290be 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -20,8 +20,8 @@ -export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, prepend_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1, - parse_expiration/1, header/2, header/3]). + extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, + header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). %%---------------------------------------------------------------------------- @@ -249,6 +249,11 @@ extract_headers(Content) -> rabbit_binary_parser:ensure_content_decoded(Content), Headers. +extract_timestamp(Content) -> + #content{properties = #'P_basic'{timestamp = Timestamp}} = + rabbit_binary_parser:ensure_content_decoded(Content), + Timestamp. + map_headers(F, Content) -> Content1 = rabbit_binary_parser:ensure_content_decoded(Content), #content{properties = #'P_basic'{headers = Headers} = Props} = Content1, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c684d9ffe9..c2c6bdc4aa 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -876,6 +876,12 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(head_message_timestamp, #vqstate{ + q3 = Q3, + q4 = Q4, + ram_pending_ack = RPA, + qi_pending_ack = QPA}) -> + head_message_timestamp(Q3, Q4, RPA, QPA); info(disk_reads, #vqstate{disk_read_count = Count}) -> Count; info(disk_writes, #vqstate{disk_write_count = Count}) -> @@ -910,6 +916,44 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. +%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp +%% among the heads of the pending acks and unread queues. +%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in +%% rather than forcing it to happen. +%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents +%% the result apparently oscillating during repeated rejects. +%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier. +head_message_timestamp(Q3, Q4, RPA, QPA) -> + HeadMsgs = [ HeadMsgStatus#msg_status.msg || + HeadMsgStatus <- + [ case ?QUEUE:is_empty(Q4) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; + true -> case ?QUEUE:is_empty(Q3) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; + true -> undefined + end + end, + case gb_trees:is_empty(RPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(QPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; + true -> undefined + end], + HeadMsgStatus /= undefined ], + Timestamps = + [ Timestamp || + Timestamp <- + [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- + HeadMsgs ], + Timestamp /= undefined ], + case Timestamps == [] of + true -> ''; + false -> lists:min(Timestamps) + end. + + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
