diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 83 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 7 |
2 files changed, 51 insertions, 39 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c2c6bdc4aa..ee84005798 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -877,7 +877,7 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; info(head_message_timestamp, #vqstate{ - q3 = Q3, + q3 = Q3, q4 = Q4, ram_pending_ack = RPA, qi_pending_ack = QPA}) -> @@ -916,43 +916,56 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. -%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp -%% among the heads of the pending acks and unread queues. -%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in -%% rather than forcing it to happen. -%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents -%% the result apparently oscillating during repeated rejects. -%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier. -head_message_timestamp(Q3, Q4, RPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || - HeadMsgStatus <- - [ case ?QUEUE:is_empty(Q4) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; - true -> case ?QUEUE:is_empty(Q3) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; - true -> undefined - end - end, - case gb_trees:is_empty(RPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(QPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; - true -> undefined - end], - HeadMsgStatus /= undefined ], - Timestamps = - [ Timestamp || - Timestamp <- - [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- - HeadMsgs ], - Timestamp /= undefined ], - case Timestamps == [] of +%% Get the Timestamp property of the first msg, if present. This is +%% the one with the oldest timestamp among the heads of the pending +%% acks and unread queues. We can't check disk_pending_acks as these +%% are paged out - we assume some will soon be paged in rather than +%% forcing it to happen. Pending ack msgs are included as they are +%% regarded as unprocessed until acked, this also prevents the result +%% apparently oscillating during repeated rejects. Q3 is only checked +%% when Q4 is empty as any Q4 msg will be earlier. +head_message_timestamp(Q3, Q4, RPA, QPA) -> + HeadMsgs = [ HeadMsgStatus#msg_status.msg || + HeadMsgStatus <- + [ get_qs_head([Q4, Q3]), + get_pa_head(RPA), + get_pa_head(QPA) ], + HeadMsgStatus /= undefined ], + + Timestamps = + [Timestamp || HeadMsg <- HeadMsgs, + Timestamp <- [rabbit_basic:extract_timestamp( + HeadMsg#basic_message.content)], + Timestamp /= undefined + ], + + case Timestamps == [] of true -> ''; - false -> lists:min(Timestamps) + false -> lists:min(Timestamps) end. +get_qs_head(Qs) -> + catch lists:foldl( + fun (Q, Acc) -> + case get_q_head(Q) of + undefined -> Acc; + Val -> throw(Val) + end + end, undefined, Qs). + +get_q_head(Q) -> + get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1). + +get_pa_head(PA) -> + get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1). + +get_collection_head(Col, IsEmpty, GetVal) -> + case IsEmpty(Col) of + false -> + {_, MsgStatus} = GetVal(Col), + MsgStatus; + true -> undefined + end. %%---------------------------------------------------------------------------- %% Minor helpers diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index a39aa81e79..ab565d1988 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -1536,9 +1536,8 @@ test_head_message_timestamp_statistic() -> after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) end, QRes = rabbit_misc:r(<<"/">>, queue, QName), - X = rabbit_misc:r(<<"/">>, exchange, <<"">>), - - {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + + {ok, Q1} = rabbit_amqqueue:lookup(QRes), QPid = Q1#amqqueue.pid, %% Set up event receiver for queue @@ -1566,7 +1565,7 @@ test_head_message_timestamp_statistic() -> %% Get second message and check timestamp is empty again rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), - '' = proplists:get_value(head_message_timestamp, Event1), + '' = proplists:get_value(head_message_timestamp, Event4), %% Teardown rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), |
