diff options
| author | Alex Thomas <alext@lshift.net> | 2015-04-08 10:36:17 +0100 |
|---|---|---|
| committer | Alex Thomas <alext@lshift.net> | 2015-04-10 17:03:48 +0100 |
| commit | 41e3f06557256435791fd948ace2afbd4c727df5 (patch) | |
| tree | fb6ccef33a4a42b52edc4dd56c32eac1116cb81d /test | |
| parent | 08c30f6d06ce1f2c6ae65b85a943f1f141cc887d (diff) | |
| download | rabbitmq-server-git-41e3f06557256435791fd948ace2afbd4c727df5.tar.gz | |
Add tests for head message timestamp queue statistic. Distinguish the former statistics tests as channel (_ch) statistics. Expand name.
Diffstat (limited to 'test')
| -rwxr-xr-x | test/src/rabbit_head_msg_timestamp_tests.py | 103 | ||||
| -rw-r--r-- | test/src/rabbit_tests.erl | 85 |
2 files changed, 74 insertions, 114 deletions
diff --git a/test/src/rabbit_head_msg_timestamp_tests.py b/test/src/rabbit_head_msg_timestamp_tests.py deleted file mode 100755 index 4e099af401..0000000000 --- a/test/src/rabbit_head_msg_timestamp_tests.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python -# -# Tests for the head_msg_timestamp queue stat. - -from datetime import datetime -import json -import os -import subprocess -import sys -from time import clock, mktime, sleep -import unittest - -TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple()) -TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple()) - -class RabbitTestCase(unittest.TestCase): - def setUp(self): - self.run_success(['declare', 'queue', 'name=test']) - self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=test', 'destination_type=queue', 'routing_key=test']) - - def tearDown(self): - self.run_success(['delete', 'queue', 'name=test']) - -class RabbitHeadMsgTimestampTestCase(RabbitTestCase): - - def test_no_timestamp_when_queue_is_empty(self): - self.assert_timestamp('undefined') - - def test_has_timestamp_when_first_msg_is_added(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - - def test_no_timestamp_when_last_msg_is_removed(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['get', 'queue=test', 'requeue=false']) - self.assert_timestamp_changed(TIMESTAMP1, 'undefined') - - def test_timestamp_updated_when_msg_is_removed(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['publish', 'routing_key=test', 'payload=test_2', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP2)]) - self.run_success(['get', 'queue=test', 'requeue=false']) - self.assert_timestamp_changed(TIMESTAMP1, TIMESTAMP2) - - # This is the best we can do to delay ACK via the mgmt interface - def test_timestamp_not_updated_before_msg_is_acked(self): - self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)]) - self.assert_timestamp_changed('undefined', TIMESTAMP1) - self.run_success(['get', 'queue=test', 'requeue=true']) - self.assert_timestamp(TIMESTAMP1) - - # --------------------------------------------------------------------------- - - def run_success(self, args, **kwargs): - (stdout, ret) = self.admin(args, **kwargs) - if ret != 0: - self.fail(stdout) - - def assert_timestamp_changed(self, from_expected, to_expected): - stats_wait_start = clock() - while ((clock() - stats_wait_start) < 10 and - self.get_queue_timestamp('test') == from_expected): - sleep(0.1) - return self.assert_timestamp(to_expected) - - def assert_timestamp(self, expected): - self.assertEqual(expected, self.get_queue_timestamp('test')) - - def get_queue_timestamp(self, queue_name): - for queue_timestamp in self.get_timestamp_stats(): - if queue_timestamp['name'] == 'test': - return queue_timestamp['head_msg_timestamp'] - return None - - def get_timestamp_stats(self): - return self.get_stats(['list', 'queues', 'name,head_msg_timestamp']) - - def get_stats(self, args0): - args = ['-f', 'pretty_json', '-q'] - args.extend(args0) - return json.loads(self.admin(args)[0]) - - def admin(self, args, stdin=None): - return run('../../../rabbitmq-management/bin/rabbitmqadmin', args, stdin) - - -def run(cmd, args, stdin): - path = os.path.normpath(os.path.join(os.getcwd(), cmd)) - cmdline = [path] - cmdline.extend(args) - proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - (stdout, stderr) = proc.communicate(stdin) - returncode = proc.returncode - return (stdout + stderr, returncode) - -if __name__ == '__main__': - print "\nrabbit head msg timestamp tests\n===============================\n" - suite = unittest.TestLoader().loadTestsFromTestCase(RabbitHeadMsgTimestampTestCase) - results = unittest.TextTestRunner(verbosity=2).run(suite) - if not results.wasSuccessful(): - sys.exit(1) - diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index 0774dd9ab5..a39aa81e79 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -13,7 +13,6 @@ %% The Initial Developer of the Original Code is GoPivotal, Inc. %% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved. %% - -module(rabbit_tests). -compile([export_all]). @@ -64,7 +63,8 @@ all_tests0() -> passed = test_log_management(), passed = test_app_management(), passed = test_log_management_during_startup(), - passed = test_statistics(), + passed = test_ch_statistics(), + passed = test_head_message_timestamp_statistic(), passed = test_arguments_parser(), passed = test_dynamic_mirroring(), passed = test_user_management(), @@ -89,7 +89,6 @@ all_tests0() -> passed = vm_memory_monitor_tests:all_tests(), passed. - do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), @@ -1440,21 +1439,21 @@ test_statistics_event_receiver(Pid) -> Foo -> Pid ! Foo, test_statistics_event_receiver(Pid) end. -test_statistics_receive_event(Ch, Matcher) -> +test_ch_statistics_receive_event(Ch, Matcher) -> rabbit_channel:flush(Ch), Ch ! emit_stats, - test_statistics_receive_event1(Ch, Matcher). + test_ch_statistics_receive_event1(Ch, Matcher). -test_statistics_receive_event1(Ch, Matcher) -> +test_ch_statistics_receive_event1(Ch, Matcher) -> receive #event{type = channel_stats, props = Props} -> case Matcher(Props) of true -> Props; - _ -> test_statistics_receive_event1(Ch, Matcher) + _ -> test_ch_statistics_receive_event1(Ch, Matcher) end after ?TIMEOUT -> throw(failed_to_receive_event) end. -test_statistics() -> +test_ch_statistics() -> application:set_env(rabbit, collect_statistics, fine), %% ATM this just tests the queue / exchange stats in channels. That's @@ -1472,7 +1471,7 @@ test_statistics() -> rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]), %% Check stats empty - Event = test_statistics_receive_event(Ch, fun (_) -> true end), + Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end), [] = proplists:get_value(channel_queue_stats, Event), [] = proplists:get_value(channel_exchange_stats, Event), [] = proplists:get_value(channel_queue_exchange_stats, Event), @@ -1484,7 +1483,7 @@ test_statistics() -> rabbit_channel:do(Ch, #'basic.get'{queue = QName}), %% Check the stats reflect that - Event2 = test_statistics_receive_event( + Event2 = test_ch_statistics_receive_event( Ch, fun (E) -> length(proplists:get_value( @@ -1497,7 +1496,7 @@ test_statistics() -> %% Check the stats remove stuff on queue deletion rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), - Event3 = test_statistics_receive_event( + Event3 = test_ch_statistics_receive_event( Ch, fun (E) -> length(proplists:get_value( @@ -1512,6 +1511,70 @@ test_statistics() -> rabbit_tests_event_receiver:stop(), passed. +test_queue_statistics_receive_event(Q, Matcher) -> + %% Q ! emit_stats, + test_queue_statistics_receive_event1(Q, Matcher). + +test_queue_statistics_receive_event1(Q, Matcher) -> + receive #event{type = queue_stats, props = Props} -> + case Matcher(Props) of + true -> Props; + _ -> test_queue_statistics_receive_event1(Q, Matcher) + end + after ?TIMEOUT -> throw(failed_to_receive_event) + end. + +test_head_message_timestamp_statistic() -> + %% Can't find a way to receive the ack here so can't test pending acks status + + application:set_env(rabbit, collect_statistics, fine), + + %% Set up a channel and queue + {_Writer, Ch} = test_spawn(), + rabbit_channel:do(Ch, #'queue.declare'{}), + QName = receive #'queue.declare_ok'{queue = Q0} -> Q0 + after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok) + end, + QRes = rabbit_misc:r(<<"/">>, queue, QName), + X = rabbit_misc:r(<<"/">>, exchange, <<"">>), + + {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)), + QPid = Q1#amqqueue.pid, + + %% Set up event receiver for queue + rabbit_tests_event_receiver:start(self(), [node()], [queue_stats]), + + %% Check timestamp is empty when queue is empty + Event1 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + '' = proplists:get_value(head_message_timestamp, Event1), + + %% Publish two messages and check timestamp is that of first message + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{timestamp = 1}, <<"">>)), + rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>, + routing_key = QName}, + rabbit_basic:build_content(#'P_basic'{timestamp = 2}, <<"">>)), + Event2 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + 1 = proplists:get_value(head_message_timestamp, Event2), + + %% Get first message and check timestamp is that of second message + rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), + Event3 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + 2 = proplists:get_value(head_message_timestamp, Event3), + + %% Get second message and check timestamp is empty again + rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}), + Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end), + '' = proplists:get_value(head_message_timestamp, Event1), + + %% Teardown + rabbit_channel:do(Ch, #'queue.delete'{queue = QName}), + rabbit_channel:shutdown(Ch), + rabbit_tests_event_receiver:stop(), + + passed. + test_refresh_events(SecondaryNode) -> rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode], [channel_created, queue_created]), |
