diff options
| author | Alex Thomas <alext@misshelpful> | 2015-02-25 14:32:48 +0000 |
|---|---|---|
| committer | Alex Thomas <alext@lshift.net> | 2015-04-10 17:03:48 +0100 |
| commit | 57af33f1c92b3d92c910df1abe35507f82d1df2f (patch) | |
| tree | ddd7c270a7fff82be8aacadba612e04a56df2018 | |
| parent | d899179f39172e5ee26918a1d222f240c9087861 (diff) | |
| download | rabbitmq-server-git-57af33f1c92b3d92c910df1abe35507f82d1df2f.tar.gz | |
Change head_msg_timestamp to peek at all internal queue heads and find the oldest timestamp among them instead of taking the first found as per PR comment #7.
| -rw-r--r-- | src/rabbit_variable_queue.erl | 73 |
1 files changed, 40 insertions, 33 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 19941f508f..f37b578aa5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -878,18 +878,18 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; info(head_msg_timestamp, #vqstate{ - q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + q3 = Q3, + q4 = Q4, ram_pending_ack = RPA, - disk_pending_ack = DPA }) -> - head_msg_timestamp(Q3, Q4, RPA, DPA); + disk_pending_ack = DPA, + qi_pending_ack = QPA}) -> + head_msg_timestamp(Q3, Q4, RPA, DPA, QPA); info(disk_reads, #vqstate{disk_read_count = Count}) -> Count; info(disk_writes, #vqstate{disk_write_count = Count}) -> Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - ram_pending_ack = RPA, - disk_pending_ack = DPA, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -918,38 +918,45 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. -%% Get the AMQP Timestamp property of the first message, if present. -%% The first message is the first one found on the heads of the pending acks and -%% unread message queues, checked in turn. +%% Get the Timestamp property of the first message, if present. +%% The first message is the one with the oldest timestamp among the heads of the +%% pending acks and unread message queues. %% Unacked messages are included as they are regarded as unprocessed until acked, %% also to avoid the timestamp oscillating during repeated rejects. -head_msg_timestamp(Q3, Q4, RPA, DPA) -> - case gb_trees:is_empty(RPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA); - true -> - case gb_trees:is_empty(DPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA); - true -> - case ?QUEUE:is_empty(Q4) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q4); - true -> - case ?QUEUE:is_empty(Q3) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q3); - true -> MsgStatus = undefined - end - end - end - end, - case MsgStatus of - undefined -> undefined; - _ -> - Msg = MsgStatus#msg_status.msg, - case Msg of - undefined -> undefined; - _ -> rabbit_basic:extract_timestamp(Msg#basic_message.content) - end +head_msg_timestamp(Q3, Q4, RPA, DPA, QPA) -> + HeadMsgs = [ HeadMsgStatus#msg_status.msg || HeadMsgStatus <- + [ case ?QUEUE:is_empty(Q4) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; + true -> undefined + end, + case ?QUEUE:is_empty(Q3) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(RPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(DPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(QPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; + true -> undefined + end], + HeadMsgStatus /= undefined ], + Timestamps = + [ Timestamp || Timestamp <- + [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- + HeadMsgs ], + Timestamp /= undefined ], + case Timestamps == [] of + true -> undefined; + false -> lists:min(Timestamps) end. + %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- |
