diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 74 |
2 files changed, 37 insertions, 39 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 71ad7f5b98..b567906378 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,7 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, head_msg_timestamp, + message_bytes_persistent, head_message_timestamp, disk_reads, disk_writes, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f37b578aa5..bbca0bd410 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -877,13 +877,12 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; -info(head_msg_timestamp, #vqstate{ +info(head_message_timestamp, #vqstate{ q3 = Q3, q4 = Q4, ram_pending_ack = RPA, - disk_pending_ack = DPA, qi_pending_ack = QPA}) -> - head_msg_timestamp(Q3, Q4, RPA, DPA, QPA); + head_message_timestamp(Q3, Q4, RPA, QPA); info(disk_reads, #vqstate{disk_read_count = Count}) -> Count; info(disk_writes, #vqstate{disk_write_count = Count}) -> @@ -918,42 +917,41 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. -%% Get the Timestamp property of the first message, if present. -%% The first message is the one with the oldest timestamp among the heads of the -%% pending acks and unread message queues. -%% Unacked messages are included as they are regarded as unprocessed until acked, -%% also to avoid the timestamp oscillating during repeated rejects. -head_msg_timestamp(Q3, Q4, RPA, DPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || HeadMsgStatus <- - [ case ?QUEUE:is_empty(Q4) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; - true -> undefined - end, - case ?QUEUE:is_empty(Q3) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(RPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(DPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(QPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; - true -> undefined - end], - HeadMsgStatus /= undefined ], +%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp +%% among the heads of the pending acks and unread queues. +%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in +%% rather than forcing it to happen. +%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents +%% the result apparently oscillating during repeated rejects. +%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier. +head_message_timestamp(Q3, Q4, RPA, QPA) -> + HeadMsgs = [ HeadMsgStatus#msg_status.msg || + HeadMsgStatus <- + [ case ?QUEUE:is_empty(Q4) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; + true -> case ?QUEUE:is_empty(Q3) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; + true -> undefined + end + end, + case gb_trees:is_empty(RPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(QPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; + true -> undefined + end], + HeadMsgStatus /= undefined ], Timestamps = - [ Timestamp || Timestamp <- - [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- - HeadMsgs ], - Timestamp /= undefined ], - case Timestamps == [] of - true -> undefined; - false -> lists:min(Timestamps) + [ Timestamp || + Timestamp <- + [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- + HeadMsgs ], + Timestamp /= undefined ], + case Timestamps == [] of + true -> ''; + false -> lists:min(Timestamps) end. |
