summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Thomas <alext@misshelpful>2015-02-25 14:32:48 +0000
committerAlex Thomas <alext@lshift.net>2015-04-10 17:03:48 +0100
commit57af33f1c92b3d92c910df1abe35507f82d1df2f (patch)
treeddd7c270a7fff82be8aacadba612e04a56df2018
parentd899179f39172e5ee26918a1d222f240c9087861 (diff)
downloadrabbitmq-server-git-57af33f1c92b3d92c910df1abe35507f82d1df2f.tar.gz
Change head_msg_timestamp to peek at all internal queue heads and find the oldest timestamp among them instead of taking the first found as per PR comment #7.
-rw-r--r--src/rabbit_variable_queue.erl73
1 files changed, 40 insertions, 33 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 19941f508f..f37b578aa5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -878,18 +878,18 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
info(head_msg_timestamp, #vqstate{
- q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ q3 = Q3,
+ q4 = Q4,
ram_pending_ack = RPA,
- disk_pending_ack = DPA }) ->
- head_msg_timestamp(Q3, Q4, RPA, DPA);
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
+ head_msg_timestamp(Q3, Q4, RPA, DPA, QPA);
info(disk_reads, #vqstate{disk_read_count = Count}) ->
Count;
info(disk_writes, #vqstate{disk_write_count = Count}) ->
Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- ram_pending_ack = RPA,
- disk_pending_ack = DPA,
len = Len,
target_ram_count = TargetRamCount,
next_seq_id = NextSeqId,
@@ -918,38 +918,45 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
-%% Get the AMQP Timestamp property of the first message, if present.
-%% The first message is the first one found on the heads of the pending acks and
-%% unread message queues, checked in turn.
+%% Get the Timestamp property of the first message, if present.
+%% The first message is the one with the oldest timestamp among the heads of the
+%% pending acks and unread message queues.
%% Unacked messages are included as they are regarded as unprocessed until acked,
%% also to avoid the timestamp oscillating during repeated rejects.
-head_msg_timestamp(Q3, Q4, RPA, DPA) ->
- case gb_trees:is_empty(RPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA);
- true ->
- case gb_trees:is_empty(DPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA);
- true ->
- case ?QUEUE:is_empty(Q4) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q4);
- true ->
- case ?QUEUE:is_empty(Q3) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q3);
- true -> MsgStatus = undefined
- end
- end
- end
- end,
- case MsgStatus of
- undefined -> undefined;
- _ ->
- Msg = MsgStatus#msg_status.msg,
- case Msg of
- undefined -> undefined;
- _ -> rabbit_basic:extract_timestamp(Msg#basic_message.content)
- end
+head_msg_timestamp(Q3, Q4, RPA, DPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg || HeadMsgStatus <-
+ [ case ?QUEUE:is_empty(Q4) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus;
+ true -> undefined
+ end,
+ case ?QUEUE:is_empty(Q3) of
+ false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus;
+ true -> undefined
+ end,
+ case gb_trees:is_empty(RPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus;
+ true -> undefined
+ end,
+ case gb_trees:is_empty(DPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA), MsgStatus;
+ true -> undefined
+ end,
+ case gb_trees:is_empty(QPA) of
+ false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus;
+ true -> undefined
+ end],
+ HeadMsgStatus /= undefined ],
+ Timestamps =
+ [ Timestamp || Timestamp <-
+ [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <-
+ HeadMsgs ],
+ Timestamp /= undefined ],
+ case Timestamps == [] of
+ true -> undefined;
+ false -> lists:min(Timestamps)
end.
+
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------