summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Thomas <alext@misshelpful>2015-02-25 12:02:42 +0000
committerAlex Thomas <alext@lshift.net>2015-04-10 17:03:48 +0100
commitd899179f39172e5ee26918a1d222f240c9087861 (patch)
treed0015787a1f4a5c590c4ae05d4e1d920ad730792
parentf5cf81ede57c7b616f6c14893613a7e58fd983f1 (diff)
downloadrabbitmq-server-git-d899179f39172e5ee26918a1d222f240c9087861.tar.gz
Move head_msg_timestamp from backing_queue_status property to top-level queue stat as per PR comment #7.
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_variable_queue.erl6
-rwxr-xr-xtest/src/rabbit_head_msg_timestamp_tests.py103
3 files changed, 109 insertions, 2 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 55c8c971a0..71ad7f5b98 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -22,7 +22,7 @@
messages_unacknowledged_ram, messages_persistent,
message_bytes, message_bytes_ready,
message_bytes_unacknowledged, message_bytes_ram,
- message_bytes_persistent,
+ message_bytes_persistent, head_msg_timestamp,
disk_reads, disk_writes, backing_queue_status]).
-ifdef(use_specs).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 88c28ac8a9..19941f508f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -877,6 +877,11 @@ info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
RamBytes;
info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
PersistentBytes;
+info(head_msg_timestamp, #vqstate{
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ head_msg_timestamp(Q3, Q4, RPA, DPA);
info(disk_reads, #vqstate{disk_read_count = Count}) ->
Count;
info(disk_writes, #vqstate{disk_write_count = Count}) ->
@@ -901,7 +906,6 @@ info(backing_queue_status, #vqstate {
{len , Len},
{target_ram_count , TargetRamCount},
{next_seq_id , NextSeqId},
- {head_msg_timestamp , head_msg_timestamp(Q3, Q4, RPA, DPA)},
{avg_ingress_rate , AvgIngressRate},
{avg_egress_rate , AvgEgressRate},
{avg_ack_ingress_rate, AvgAckIngressRate},
diff --git a/test/src/rabbit_head_msg_timestamp_tests.py b/test/src/rabbit_head_msg_timestamp_tests.py
new file mode 100755
index 0000000000..4e099af401
--- /dev/null
+++ b/test/src/rabbit_head_msg_timestamp_tests.py
@@ -0,0 +1,103 @@
+#!/usr/bin/env python
+#
+# Tests for the head_msg_timestamp queue stat.
+
+from datetime import datetime
+import json
+import os
+import subprocess
+import sys
+from time import clock, mktime, sleep
+import unittest
+
+TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple())
+TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple())
+
+class RabbitTestCase(unittest.TestCase):
+ def setUp(self):
+ self.run_success(['declare', 'queue', 'name=test'])
+ self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=test', 'destination_type=queue', 'routing_key=test'])
+
+ def tearDown(self):
+ self.run_success(['delete', 'queue', 'name=test'])
+
+class RabbitHeadMsgTimestampTestCase(RabbitTestCase):
+
+ def test_no_timestamp_when_queue_is_empty(self):
+ self.assert_timestamp('undefined')
+
+ def test_has_timestamp_when_first_msg_is_added(self):
+ self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
+ self.assert_timestamp_changed('undefined', TIMESTAMP1)
+
+ def test_no_timestamp_when_last_msg_is_removed(self):
+ self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
+ self.assert_timestamp_changed('undefined', TIMESTAMP1)
+ self.run_success(['get', 'queue=test', 'requeue=false'])
+ self.assert_timestamp_changed(TIMESTAMP1, 'undefined')
+
+ def test_timestamp_updated_when_msg_is_removed(self):
+ self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
+ self.assert_timestamp_changed('undefined', TIMESTAMP1)
+ self.run_success(['publish', 'routing_key=test', 'payload=test_2', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP2)])
+ self.run_success(['get', 'queue=test', 'requeue=false'])
+ self.assert_timestamp_changed(TIMESTAMP1, TIMESTAMP2)
+
+ # This is the best we can do to delay ACK via the mgmt interface
+ def test_timestamp_not_updated_before_msg_is_acked(self):
+ self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
+ self.assert_timestamp_changed('undefined', TIMESTAMP1)
+ self.run_success(['get', 'queue=test', 'requeue=true'])
+ self.assert_timestamp(TIMESTAMP1)
+
+ # ---------------------------------------------------------------------------
+
+ def run_success(self, args, **kwargs):
+ (stdout, ret) = self.admin(args, **kwargs)
+ if ret != 0:
+ self.fail(stdout)
+
+ def assert_timestamp_changed(self, from_expected, to_expected):
+ stats_wait_start = clock()
+ while ((clock() - stats_wait_start) < 10 and
+ self.get_queue_timestamp('test') == from_expected):
+ sleep(0.1)
+ return self.assert_timestamp(to_expected)
+
+ def assert_timestamp(self, expected):
+ self.assertEqual(expected, self.get_queue_timestamp('test'))
+
+ def get_queue_timestamp(self, queue_name):
+ for queue_timestamp in self.get_timestamp_stats():
+ if queue_timestamp['name'] == 'test':
+ return queue_timestamp['head_msg_timestamp']
+ return None
+
+ def get_timestamp_stats(self):
+ return self.get_stats(['list', 'queues', 'name,head_msg_timestamp'])
+
+ def get_stats(self, args0):
+ args = ['-f', 'pretty_json', '-q']
+ args.extend(args0)
+ return json.loads(self.admin(args)[0])
+
+ def admin(self, args, stdin=None):
+ return run('../../../rabbitmq-management/bin/rabbitmqadmin', args, stdin)
+
+
+def run(cmd, args, stdin):
+ path = os.path.normpath(os.path.join(os.getcwd(), cmd))
+ cmdline = [path]
+ cmdline.extend(args)
+ proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
+ (stdout, stderr) = proc.communicate(stdin)
+ returncode = proc.returncode
+ return (stdout + stderr, returncode)
+
+if __name__ == '__main__':
+ print "\nrabbit head msg timestamp tests\n===============================\n"
+ suite = unittest.TestLoader().loadTestsFromTestCase(RabbitHeadMsgTimestampTestCase)
+ results = unittest.TextTestRunner(verbosity=2).run(suite)
+ if not results.wasSuccessful():
+ sys.exit(1)
+