diff options
| author | Alex Thomas <alext@misshelpful> | 2015-02-25 12:02:42 +0000 |
|---|---|---|
| committer | Alex Thomas <alext@lshift.net> | 2015-04-10 17:03:48 +0100 |
| commit | d899179f39172e5ee26918a1d222f240c9087861 (patch) | |
| tree | d0015787a1f4a5c590c4ae05d4e1d920ad730792 | |
| parent | f5cf81ede57c7b616f6c14893613a7e58fd983f1 (diff) | |
| download | rabbitmq-server-git-d899179f39172e5ee26918a1d222f240c9087861.tar.gz | |
Move head_msg_timestamp from backing_queue_status property to top-level queue stat as per PR comment #7.
| -rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 | ||||
| -rwxr-xr-x | test/src/rabbit_head_msg_timestamp_tests.py | 103 |
3 files changed, 109 insertions, 2 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 55c8c971a0..71ad7f5b98 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -22,7 +22,7 @@ messages_unacknowledged_ram, messages_persistent, message_bytes, message_bytes_ready, message_bytes_unacknowledged, message_bytes_ram, - message_bytes_persistent, + message_bytes_persistent, head_msg_timestamp, disk_reads, disk_writes, backing_queue_status]). -ifdef(use_specs). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 88c28ac8a9..19941f508f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -877,6 +877,11 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) -> RamBytes; info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) -> PersistentBytes; +info(head_msg_timestamp, #vqstate{ + q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + ram_pending_ack = RPA, + disk_pending_ack = DPA }) -> + head_msg_timestamp(Q3, Q4, RPA, DPA); info(disk_reads, #vqstate{disk_read_count = Count}) -> Count; info(disk_writes, #vqstate{disk_write_count = Count}) -> @@ -901,7 +906,6 @@ info(backing_queue_status, #vqstate { {len , Len}, {target_ram_count , TargetRamCount}, {next_seq_id , NextSeqId}, - {head_msg_timestamp , head_msg_timestamp(Q3, Q4, RPA, DPA)}, {avg_ingress_rate , AvgIngressRate}, {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, diff --git a/test/src/rabbit_head_msg_timestamp_tests.py b/test/src/rabbit_head_msg_timestamp_tests.py new file mode 100755 index 0000000000..4e099af401 --- /dev/null +++ b/test/src/rabbit_head_msg_timestamp_tests.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python +# +# Tests for the head_msg_timestamp queue stat. + +from datetime import datetime +import json +import os +import subprocess +import sys +from time import clock, mktime, sleep +import unittest + +TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple()) +TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple()) + +class RabbitTestCase(unittest.TestCase): + def setUp(self): + self.run_success(['declare', 'queue', 'name=test']) + self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=test', 'destination_type=queue', 'routing_key=test']) + + def tearDown(self): + self.run_success(['delete', 'queue', 'name=test']) + +class RabbitHeadMsgTimestampTestCase(RabbitTestCase): + + def test_no_timestamp_when_queue_is_empty(self): + self.assert_timestamp('undefined') + + def test_has_timestamp_when_first_msg_is_added(self): + self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) + self.assert_timestamp_changed('undefined', TIMESTAMP1) + + def test_no_timestamp_when_last_msg_is_removed(self): + self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) + self.assert_timestamp_changed('undefined', TIMESTAMP1) + self.run_success(['get', 'queue=test', 'requeue=false']) + self.assert_timestamp_changed(TIMESTAMP1, 'undefined') + + def test_timestamp_updated_when_msg_is_removed(self): + self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) + self.assert_timestamp_changed('undefined', TIMESTAMP1) + self.run_success(['publish', 'routing_key=test', 'payload=test_2', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP2)]) + self.run_success(['get', 'queue=test', 'requeue=false']) + self.assert_timestamp_changed(TIMESTAMP1, TIMESTAMP2) + + # This is the best we can do to delay ACK via the mgmt interface + def test_timestamp_not_updated_before_msg_is_acked(self): + self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) + self.assert_timestamp_changed('undefined', TIMESTAMP1) + self.run_success(['get', 'queue=test', 'requeue=true']) + self.assert_timestamp(TIMESTAMP1) + + # --------------------------------------------------------------------------- + + def run_success(self, args, **kwargs): + (stdout, ret) = self.admin(args, **kwargs) + if ret != 0: + self.fail(stdout) + + def assert_timestamp_changed(self, from_expected, to_expected): + stats_wait_start = clock() + while ((clock() - stats_wait_start) < 10 and + self.get_queue_timestamp('test') == from_expected): + sleep(0.1) + return self.assert_timestamp(to_expected) + + def assert_timestamp(self, expected): + self.assertEqual(expected, self.get_queue_timestamp('test')) + + def get_queue_timestamp(self, queue_name): + for queue_timestamp in self.get_timestamp_stats(): + if queue_timestamp['name'] == 'test': + return queue_timestamp['head_msg_timestamp'] + return None + + def get_timestamp_stats(self): + return self.get_stats(['list', 'queues', 'name,head_msg_timestamp']) + + def get_stats(self, args0): + args = ['-f', 'pretty_json', '-q'] + args.extend(args0) + return json.loads(self.admin(args)[0]) + + def admin(self, args, stdin=None): + return run('../../../rabbitmq-management/bin/rabbitmqadmin', args, stdin) + + +def run(cmd, args, stdin): + path = os.path.normpath(os.path.join(os.getcwd(), cmd)) + cmdline = [path] + cmdline.extend(args) + proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + (stdout, stderr) = proc.communicate(stdin) + returncode = proc.returncode + return (stdout + stderr, returncode) + +if __name__ == '__main__': + print "\nrabbit head msg timestamp tests\n===============================\n" + suite = unittest.TestLoader().loadTestsFromTestCase(RabbitHeadMsgTimestampTestCase) + results = unittest.TextTestRunner(verbosity=2).run(suite) + if not results.wasSuccessful(): + sys.exit(1) + |
