diff options
| -rw-r--r-- | docs/rabbitmqctl.1.xml | 4 | ||||
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 74 | ||||
| -rwxr-xr-x | test/src/rabbit_head_msg_timestamp_tests.py | 103 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 85 |
5 files changed, 113 insertions, 155 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 7c843b7ade..a782f749cf 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1261,8 +1261,8 @@ <listitem><para>Like <command>message_bytes</command> but counting only those messages which are persistent.</para></listitem> </varlistentry> <varlistentry> - <term>head_msg_timestamp</term> - <listitem><para>The timestamp property of the first message in the queue, if present.</para></listitem> + <term>head_message_timestamp</term> + <listitem><para>The timestamp property of the first message in the queue, if present. Timestamps of messages only appear when they are in the paged-in state.</para></listitem> </varlistentry> <varlistentry> <term>disk_reads</term> diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 71ad7f5b98..b567906378 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,7 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, head_msg_timestamp, + message_bytes_persistent, head_message_timestamp, disk_reads, disk_writes, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f37b578aa5..bbca0bd410 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -877,13 +877,12 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; -info(head_msg_timestamp, #vqstate{ +info(head_message_timestamp, #vqstate{ q3 = Q3, q4 = Q4, ram_pending_ack = RPA, - disk_pending_ack = DPA, qi_pending_ack = QPA}) -> - head_msg_timestamp(Q3, Q4, RPA, DPA, QPA); + head_message_timestamp(Q3, Q4, RPA, QPA); info(disk_reads, #vqstate{disk_read_count = Count}) -> Count; info(disk_writes, #vqstate{disk_write_count = Count}) -> @@ -918,42 +917,41 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. -%% Get the Timestamp property of the first message, if present. -%% The first message is the one with the oldest timestamp among the heads of the -%% pending acks and unread message queues. -%% Unacked messages are included as they are regarded as unprocessed until acked, -%% also to avoid the timestamp oscillating during repeated rejects. -head_msg_timestamp(Q3, Q4, RPA, DPA, QPA) -> - HeadMsgs = [ HeadMsgStatus#msg_status.msg || HeadMsgStatus <- - [ case ?QUEUE:is_empty(Q4) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; - true -> undefined - end, - case ?QUEUE:is_empty(Q3) of - false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(RPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(DPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(DPA), MsgStatus; - true -> undefined - end, - case gb_trees:is_empty(QPA) of - false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; - true -> undefined - end], - HeadMsgStatus /= undefined ], +%% Get the Timestamp property of the first msg, if present. This is the one with the oldest timestamp +%% among the heads of the pending acks and unread queues. +%% We can't check disk_pending_acks as these are paged out - we assume some will soon be paged in +%% rather than forcing it to happen. +%% Pending ack msgs are included as they are regarded as unprocessed until acked, this also prevents +%% the result apparently oscillating during repeated rejects. +%% Q3 is only checked when Q4 is empty as any Q4 msg will be earlier. +head_message_timestamp(Q3, Q4, RPA, QPA) -> + HeadMsgs = [ HeadMsgStatus#msg_status.msg || + HeadMsgStatus <- + [ case ?QUEUE:is_empty(Q4) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q4), MsgStatus; + true -> case ?QUEUE:is_empty(Q3) of + false -> {value, MsgStatus} = ?QUEUE:peek(Q3), MsgStatus; + true -> undefined + end + end, + case gb_trees:is_empty(RPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(RPA), MsgStatus; + true -> undefined + end, + case gb_trees:is_empty(QPA) of + false -> {_SeqId, MsgStatus} = gb_trees:smallest(QPA), MsgStatus; + true -> undefined + end], + HeadMsgStatus /= undefined ], Timestamps = - [ Timestamp || Timestamp <- - [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- - HeadMsgs ], - Timestamp /= undefined ], - case Timestamps == [] of - true -> undefined; - false -> lists:min(Timestamps) + [ Timestamp || + Timestamp <- + [ rabbit_basic:extract_timestamp(HeadMsg#basic_message.content) || HeadMsg <- + HeadMsgs ], + Timestamp /= undefined ], + case Timestamps == [] of + true -> ''; + false -> lists:min(Timestamps) end. diff --git a/test/src/rabbit_head_msg_timestamp_tests.py b/test/src/rabbit_head_msg_timestamp_tests.py deleted file mode 100755 index 4e099af401..0000000000 --- a/test/src/rabbit_head_msg_timestamp_tests.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# -# Tests for the head_msg_timestamp queue stat. - -from datetime import datetime -import json -import os -import subprocess -import sys -from time import clock, mktime, sleep -import unittest - -TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple()) -TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple()) - -class RabbitTestCase(unittest.TestCase): - def setUp(self): - self.run_success(['declare', 'queue', 'name=test']) - self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=test', 'destination_type=queue', 'routing_key=test']) - - def tearDown(self): - self.run_success(['delete', 'queue', 'name=test']) - -class RabbitHeadMsgTimestampTestCase(RabbitTestCase): - - def test_no_timestamp_when_queue_is_empty(self): - self.assert_timestamp('undefined') - - def test_has_timestamp_when_first_msg_is_added(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - - def test_no_timestamp_when_last_msg_is_removed(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['get', 'queue=test', 'requeue=false']) - self.assert_timestamp_changed(TIMESTAMP1, 'undefined') - - def test_timestamp_updated_when_msg_is_removed(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['publish', 'routing_key=test', 'payload=test_2', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP2)]) - self.run_success(['get', 'queue=test', 'requeue=false']) - self.assert_timestamp_changed(TIMESTAMP1, TIMESTAMP2) - - # This is the best we can do to delay ACK via the mgmt interface - def test_timestamp_not_updated_before_msg_is_acked(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['get', 'queue=test', 'requeue=true']) - self.assert_timestamp(TIMESTAMP1) - - # --------------------------------------------------------------------------- - - def run_success(self, args, **kwargs): - (stdout, ret) = self.admin(args, **kwargs) - if ret != 0: - self.fail(stdout) - - def assert_timestamp_changed(self, from_expected, to_expected): - stats_wait_start = clock() - while ((clock() - stats_wait_start) < 10 and - self.get_queue_timestamp('test') == from_expected): - sleep(0.1) - return self.assert_timestamp(to_expected) - - def assert_timestamp(self, expected): - self.assertEqual(expected, self.get_queue_timestamp('test')) - - def get_queue_timestamp(self, queue_name): - for queue_timestamp in self.get_timestamp_stats(): - if queue_timestamp['name'] == 'test': - return queue_timestamp['head_msg_timestamp'] - return None - - def get_timestamp_stats(self): - return self.get_stats(['list', 'queues', 'name,head_msg_timestamp']) - - def get_stats(self, args0): - args = ['-f', 'pretty_json', '-q'] - args.extend(args0) - return json.loads(self.admin(args)[0]) - - def admin(self, args, stdin=None): - return run('../../../rabbitmq-management/bin/rabbitmqadmin', args, stdin) - - -def run(cmd, args, stdin): - path = os.path.normpath(os.path.join(os.getcwd(), cmd)) - cmdline = [path] - cmdline.extend(args) - proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = proc.communicate(stdin) - returncode = proc.returncode - return (stdout + stderr, returncode) - -if __name__ == '__main__': - print "\nrabbit head msg timestamp tests\n===============================\n" - suite = unittest.TestLoader().loadTestsFromTestCase(RabbitHeadMsgTimestampTestCase) - results = unittest.TextTestRunner(verbosity=2).run(suite) - if not results.wasSuccessful(): - sys.exit(1) - diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 0774dd9ab5..a39aa81e79 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -13,7 +13,6 @@ %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% - -module(rabbit_tests). -compile([export_all]). @@ -64,7 +63,8 @@ all_tests0() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), - passed = test_statistics(), + passed = test_ch_statistics(), + passed = test_head_message_timestamp_statistic(), passed = test_arguments_parser(), passed = test_dynamic_mirroring(), passed = test_user_management(), @@ -89,7 +89,6 @@ all_tests0() -> passed = vm_memory_monitor_tests:all_tests(), passed. - do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), @@ -1440,21 +1439,21 @@ test_statistics_event_receiver(Pid) -> Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) end. -test_statistics_receive_event(Ch, Matcher) -> +test_ch_statistics_receive_event(Ch, Matcher) -> rabbit_channel:flush(Ch), Ch ! emit_stats, - test_statistics_receive_event1(Ch, Matcher). + test_ch_statistics_receive_event1(Ch, Matcher). -test_statistics_receive_event1(Ch, Matcher) -> +test_ch_statistics_receive_event1(Ch, Matcher) -> receive #event{type = channel_stats, props = Props} -> case Matcher(Props) of true -> Props; - _ -> test_statistics_receive_event1(Ch, Matcher) + _ -> test_ch_statistics_receive_event1(Ch, Matcher) end after ?TIMEOUT -> throw(failed_to_receive_event) end. -test_statistics() -> +test_ch_statistics() -> application:set_env(rabbit, collect_statistics, fine), %% ATM this just tests the queue / exchange stats in channels. That's @@ -1472,7 +1471,7 @@ test_statistics() -> rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), %% Check stats empty - Event = test_statistics_receive_event(Ch, fun (_) -> true end), + Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end), [] = proplists:get_value(channel_queue_stats, Event), [] = proplists:get_value(channel_exchange_stats, Event), [] = proplists:get_value(channel_queue_exchange_stats, Event), @@ -1484,7 +1483,7 @@ test_statistics() -> rabbit_channel:do(Ch, #'basic.get'{queue = QName}), %% Check the stats reflect that - Event2 = test_statistics_receive_event( + Event2 = test_ch_statistics_receive_event( Ch, fun (E) -> length(proplists:get_value( @@ -1497,7 +1496,7 @@ test_statistics() -> %% Check the stats remove stuff on queue deletion rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), - Event3 = test_statistics_receive_event( + Event3 = test_ch_statistics_receive_event( Ch, fun (E) -> length(proplists:get_value( @@ -1512,6 +1511,70 @@ test_statistics() -> rabbit_tests_event_receiver:stop(), passed. +test_queue_statistics_receive_event(Q, Matcher) -> + %% Q ! emit_stats, + test_queue_statistics_receive_event1(Q, Matcher). + +test_queue_statistics_receive_event1(Q, Matcher) -> + receive #event{type = queue_stats, props = Props} -> + case Matcher(Props) of + true -> Props; + _ -> test_queue_statistics_receive_event1(Q, Matcher) + end + after ?TIMEOUT -> throw(failed_to_receive_event) + end. + +test_head_message_timestamp_statistic() -> + %% Can't find a way to receive the ack here so can't test pending acks status + + application:set_env(rabbit, collect_statistics, fine), + + %% Set up a channel and queue + {_Writer, Ch} = test_spawn(), + rabbit_channel:do(Ch, #'queue.declare'{}), + QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) + end, + QRes = rabbit_misc:r(<<"/">>, queue, QName), + X = rabbit_misc:r(<<"/">>, exchange, <<"">>), + + {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + QPid = Q1#amqqueue.pid, + + %% Set up event receiver for queue + rabbit_tests_event_receiver:start(self(), [node()], [queue_stats]), + + %% Check timestamp is empty when queue is empty + Event1 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + '' = proplists:get_value(head_message_timestamp, Event1), + + %% Publish two messages and check timestamp is that of first message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{timestamp = 1}, <<"">>)), + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{timestamp = 2}, <<"">>)), + Event2 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + 1 = proplists:get_value(head_message_timestamp, Event2), + + %% Get first message and check timestamp is that of second message + rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), + Event3 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + 2 = proplists:get_value(head_message_timestamp, Event3), + + %% Get second message and check timestamp is empty again + rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), + Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + '' = proplists:get_value(head_message_timestamp, Event1), + + %% Teardown + rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), + rabbit_channel:shutdown(Ch), + rabbit_tests_event_receiver:stop(), + + passed. + test_refresh_events(SecondaryNode) -> rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode], [channel_created, queue_created]), |
