summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl83
-rw-r--r--test/src/rabbit_tests.erl7
2 files changed, 51 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c2c6bdc4aa..ee84005798 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -877,7 +877,7 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
info(head_message_timestamp, #vqstate{
- q3 = Q3,
+ q3 = Q3,
q4 = Q4,
ram_pending_ack = RPA,
qi_pending_ack = QPA}) ->
@@ -916,43 +916,56 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
-%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp
-%% among the heads of the pending acks and unread queues.
-%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in
-%% rather than forcing it to happen.
-%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents
-%% the result apparently oscillating during repeated rejects.
-%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier.
-head_message_timestamp(Q3, Q4, RPA, QPA) ->
- HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
- HeadMsgStatus <-
- [ case ?QUEUE:is_empty(Q4) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus;
- true -> case ?QUEUE:is_empty(Q3) of
- false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus;
- true -> undefined
- end
- end,
- case gb_trees:is_empty(RPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus;
- true -> undefined
- end,
- case gb_trees:is_empty(QPA) of
- false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus;
- true -> undefined
- end],
- HeadMsgStatus /= undefined ],
- Timestamps =
- [ Timestamp ||
- Timestamp <-
- [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <-
- HeadMsgs ],
- Timestamp /= undefined ],
- case Timestamps == [] of
+%% Get the Timestamp property of the first msg, if present. This is
+%% the one with the oldest timestamp among the heads of the pending
+%% acks and unread queues. We can't check disk_pending_acks as these
+%% are paged out - we assume some will soon be paged in rather than
+%% forcing it to happen. Pending ack msgs are included as they are
+%% regarded as unprocessed until acked, this also prevents the result
+%% apparently oscillating during repeated rejects. Q3 is only checked
+%% when Q4 is empty as any Q4 msg will be earlier.
+head_message_timestamp(Q3, Q4, RPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
+ HeadMsgStatus <-
+ [ get_qs_head([Q4, Q3]),
+ get_pa_head(RPA),
+ get_pa_head(QPA) ],
+ HeadMsgStatus /= undefined ],
+
+ Timestamps =
+ [Timestamp || HeadMsg <- HeadMsgs,
+ Timestamp <- [rabbit_basic:extract_timestamp(
+ HeadMsg#basic_message.content)],
+ Timestamp /= undefined
+ ],
+
+ case Timestamps == [] of
true -> '';
- false -> lists:min(Timestamps)
+ false -> lists:min(Timestamps)
end.
+get_qs_head(Qs) ->
+ catch lists:foldl(
+ fun (Q, Acc) ->
+ case get_q_head(Q) of
+ undefined -> Acc;
+ Val -> throw(Val)
+ end
+ end, undefined, Qs).
+
+get_q_head(Q) ->
+ get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
+
+get_pa_head(PA) ->
+ get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
+
+get_collection_head(Col, IsEmpty, GetVal) ->
+ case IsEmpty(Col) of
+ false ->
+ {_, MsgStatus} = GetVal(Col),
+ MsgStatus;
+ true -> undefined
+ end.
%%----------------------------------------------------------------------------
%% Minor helpers
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index a39aa81e79..ab565d1988 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -1536,9 +1536,8 @@ test_head_message_timestamp_statistic() ->
after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
end,
QRes = rabbit_misc:r(<<"/">>, queue, QName),
- X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
-
- {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+
+ {ok, Q1} = rabbit_amqqueue:lookup(QRes),
QPid = Q1#amqqueue.pid,
%% Set up event receiver for queue
@@ -1566,7 +1565,7 @@ test_head_message_timestamp_statistic() ->
%% Get second message and check timestamp is empty again
rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}),
Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
- '' = proplists:get_value(head_message_timestamp, Event1),
+ '' = proplists:get_value(head_message_timestamp, Event4),
%% Teardown
rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),