summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorAlex Thomas <alext@lshift.net>2015-04-08 10:36:17 +0100
committerAlex Thomas <alext@lshift.net>2015-04-10 17:03:48 +0100
commit41e3f06557256435791fd948ace2afbd4c727df5 (patch)
treefb6ccef33a4a42b52edc4dd56c32eac1116cb81d /test
parent08c30f6d06ce1f2c6ae65b85a943f1f141cc887d (diff)
downloadrabbitmq-server-git-41e3f06557256435791fd948ace2afbd4c727df5.tar.gz
Add tests for head message timestamp queue statistic. Distinguish the former statistics tests as channel (_ch) statistics. Expand name.
Diffstat (limited to 'test')
-rwxr-xr-xtest/src/rabbit_head_msg_timestamp_tests.py103
-rw-r--r--test/src/rabbit_tests.erl85
2 files changed, 74 insertions, 114 deletions
diff --git a/test/src/rabbit_head_msg_timestamp_tests.py b/test/src/rabbit_head_msg_timestamp_tests.py
deleted file mode 100755
index 4e099af401..0000000000
--- a/test/src/rabbit_head_msg_timestamp_tests.py
+++ /dev/null
@@ -1,103 +0,0 @@
-#!/usr/bin/env python
-#
-# Tests for the head_msg_timestamp queue stat.
-
-from datetime import datetime
-import json
-import os
-import subprocess
-import sys
-from time import clock, mktime, sleep
-import unittest
-
-TIMESTAMP1 = mktime(datetime(2010,1,1,12,00,01).timetuple())
-TIMESTAMP2 = mktime(datetime(2010,1,1,12,00,02).timetuple())
-
-class RabbitTestCase(unittest.TestCase):
- def setUp(self):
- self.run_success(['declare', 'queue', 'name=test'])
- self.run_success(['declare', 'binding', 'source=amq.direct', 'destination=test', 'destination_type=queue', 'routing_key=test'])
-
- def tearDown(self):
- self.run_success(['delete', 'queue', 'name=test'])
-
-class RabbitHeadMsgTimestampTestCase(RabbitTestCase):
-
- def test_no_timestamp_when_queue_is_empty(self):
- self.assert_timestamp('undefined')
-
- def test_has_timestamp_when_first_msg_is_added(self):
- self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
- self.assert_timestamp_changed('undefined', TIMESTAMP1)
-
- def test_no_timestamp_when_last_msg_is_removed(self):
- self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
- self.assert_timestamp_changed('undefined', TIMESTAMP1)
- self.run_success(['get', 'queue=test', 'requeue=false'])
- self.assert_timestamp_changed(TIMESTAMP1, 'undefined')
-
- def test_timestamp_updated_when_msg_is_removed(self):
- self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
- self.assert_timestamp_changed('undefined', TIMESTAMP1)
- self.run_success(['publish', 'routing_key=test', 'payload=test_2', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP2)])
- self.run_success(['get', 'queue=test', 'requeue=false'])
- self.assert_timestamp_changed(TIMESTAMP1, TIMESTAMP2)
-
- # This is the best we can do to delay ACK via the mgmt interface
- def test_timestamp_not_updated_before_msg_is_acked(self):
- self.run_success(['publish', 'routing_key=test', 'payload=test_1', 'properties={{"timestamp":{0:.0f}}}'.format(TIMESTAMP1)])
- self.assert_timestamp_changed('undefined', TIMESTAMP1)
- self.run_success(['get', 'queue=test', 'requeue=true'])
- self.assert_timestamp(TIMESTAMP1)
-
- # ---------------------------------------------------------------------------
-
- def run_success(self, args, **kwargs):
- (stdout, ret) = self.admin(args, **kwargs)
- if ret != 0:
- self.fail(stdout)
-
- def assert_timestamp_changed(self, from_expected, to_expected):
- stats_wait_start = clock()
- while ((clock() - stats_wait_start) < 10 and
- self.get_queue_timestamp('test') == from_expected):
- sleep(0.1)
- return self.assert_timestamp(to_expected)
-
- def assert_timestamp(self, expected):
- self.assertEqual(expected, self.get_queue_timestamp('test'))
-
- def get_queue_timestamp(self, queue_name):
- for queue_timestamp in self.get_timestamp_stats():
- if queue_timestamp['name'] == 'test':
- return queue_timestamp['head_msg_timestamp']
- return None
-
- def get_timestamp_stats(self):
- return self.get_stats(['list', 'queues', 'name,head_msg_timestamp'])
-
- def get_stats(self, args0):
- args = ['-f', 'pretty_json', '-q']
- args.extend(args0)
- return json.loads(self.admin(args)[0])
-
- def admin(self, args, stdin=None):
- return run('../../../rabbitmq-management/bin/rabbitmqadmin', args, stdin)
-
-
-def run(cmd, args, stdin):
- path = os.path.normpath(os.path.join(os.getcwd(), cmd))
- cmdline = [path]
- cmdline.extend(args)
- proc = subprocess.Popen(cmdline, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- (stdout, stderr) = proc.communicate(stdin)
- returncode = proc.returncode
- return (stdout + stderr, returncode)
-
-if __name__ == '__main__':
- print "\nrabbit head msg timestamp tests\n===============================\n"
- suite = unittest.TestLoader().loadTestsFromTestCase(RabbitHeadMsgTimestampTestCase)
- results = unittest.TextTestRunner(verbosity=2).run(suite)
- if not results.wasSuccessful():
- sys.exit(1)
-
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index 0774dd9ab5..a39aa81e79 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -13,7 +13,6 @@
%% The Initial Developer of the Original Code is GoPivotal, Inc.
%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
%%
-
-module(rabbit_tests).
-compile([export_all]).
@@ -64,7 +63,8 @@ all_tests0() ->
passed = test_log_management(),
passed = test_app_management(),
passed = test_log_management_during_startup(),
- passed = test_statistics(),
+ passed = test_ch_statistics(),
+ passed = test_head_message_timestamp_statistic(),
passed = test_arguments_parser(),
passed = test_dynamic_mirroring(),
passed = test_user_management(),
@@ -89,7 +89,6 @@ all_tests0() ->
passed = vm_memory_monitor_tests:all_tests(),
passed.
-
do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
@@ -1440,21 +1439,21 @@ test_statistics_event_receiver(Pid) ->
Foo -> Pid ! Foo, test_statistics_event_receiver(Pid)
end.
-test_statistics_receive_event(Ch, Matcher) ->
+test_ch_statistics_receive_event(Ch, Matcher) ->
rabbit_channel:flush(Ch),
Ch ! emit_stats,
- test_statistics_receive_event1(Ch, Matcher).
+ test_ch_statistics_receive_event1(Ch, Matcher).
-test_statistics_receive_event1(Ch, Matcher) ->
+test_ch_statistics_receive_event1(Ch, Matcher) ->
receive #event{type = channel_stats, props = Props} ->
case Matcher(Props) of
true -> Props;
- _ -> test_statistics_receive_event1(Ch, Matcher)
+ _ -> test_ch_statistics_receive_event1(Ch, Matcher)
end
after ?TIMEOUT -> throw(failed_to_receive_event)
end.
-test_statistics() ->
+test_ch_statistics() ->
application:set_env(rabbit, collect_statistics, fine),
%% ATM this just tests the queue / exchange stats in channels. That's
@@ -1472,7 +1471,7 @@ test_statistics() ->
rabbit_tests_event_receiver:start(self(), [node()], [channel_stats]),
%% Check stats empty
- Event = test_statistics_receive_event(Ch, fun (_) -> true end),
+ Event = test_ch_statistics_receive_event(Ch, fun (_) -> true end),
[] = proplists:get_value(channel_queue_stats, Event),
[] = proplists:get_value(channel_exchange_stats, Event),
[] = proplists:get_value(channel_queue_exchange_stats, Event),
@@ -1484,7 +1483,7 @@ test_statistics() ->
rabbit_channel:do(Ch, #'basic.get'{queue = QName}),
%% Check the stats reflect that
- Event2 = test_statistics_receive_event(
+ Event2 = test_ch_statistics_receive_event(
Ch,
fun (E) ->
length(proplists:get_value(
@@ -1497,7 +1496,7 @@ test_statistics() ->
%% Check the stats remove stuff on queue deletion
rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
- Event3 = test_statistics_receive_event(
+ Event3 = test_ch_statistics_receive_event(
Ch,
fun (E) ->
length(proplists:get_value(
@@ -1512,6 +1511,70 @@ test_statistics() ->
rabbit_tests_event_receiver:stop(),
passed.
+test_queue_statistics_receive_event(Q, Matcher) ->
+ %% Q ! emit_stats,
+ test_queue_statistics_receive_event1(Q, Matcher).
+
+test_queue_statistics_receive_event1(Q, Matcher) ->
+ receive #event{type = queue_stats, props = Props} ->
+ case Matcher(Props) of
+ true -> Props;
+ _ -> test_queue_statistics_receive_event1(Q, Matcher)
+ end
+ after ?TIMEOUT -> throw(failed_to_receive_event)
+ end.
+
+test_head_message_timestamp_statistic() ->
+ %% Can't find a way to receive the ack here so can't test pending acks status
+
+ application:set_env(rabbit, collect_statistics, fine),
+
+ %% Set up a channel and queue
+ {_Writer, Ch} = test_spawn(),
+ rabbit_channel:do(Ch, #'queue.declare'{}),
+ QName = receive #'queue.declare_ok'{queue = Q0} -> Q0
+ after ?TIMEOUT -> throw(failed_to_receive_queue_declare_ok)
+ end,
+ QRes = rabbit_misc:r(<<"/">>, queue, QName),
+ X = rabbit_misc:r(<<"/">>, exchange, <<"">>),
+
+ {ok, Q1} = rabbit_amqqueue:lookup(rabbit_misc:r(<<"/">>, queue, QName)),
+ QPid = Q1#amqqueue.pid,
+
+ %% Set up event receiver for queue
+ rabbit_tests_event_receiver:start(self(), [node()], [queue_stats]),
+
+ %% Check timestamp is empty when queue is empty
+ Event1 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ '' = proplists:get_value(head_message_timestamp, Event1),
+
+ %% Publish two messages and check timestamp is that of first message
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{timestamp = 1}, <<"">>)),
+ rabbit_channel:do(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = QName},
+ rabbit_basic:build_content(#'P_basic'{timestamp = 2}, <<"">>)),
+ Event2 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ 1 = proplists:get_value(head_message_timestamp, Event2),
+
+ %% Get first message and check timestamp is that of second message
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}),
+ Event3 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ 2 = proplists:get_value(head_message_timestamp, Event3),
+
+ %% Get second message and check timestamp is empty again
+ rabbit_channel:do(Ch, #'basic.get'{queue = QName, no_ack = true}),
+ Event4 = test_queue_statistics_receive_event(QPid, fun (E) -> proplists:get_value(name, E) == QRes end),
+ '' = proplists:get_value(head_message_timestamp, Event1),
+
+ %% Teardown
+ rabbit_channel:do(Ch, #'queue.delete'{queue = QName}),
+ rabbit_channel:shutdown(Ch),
+ rabbit_tests_event_receiver:stop(),
+
+ passed.
+
test_refresh_events(SecondaryNode) ->
rabbit_tests_event_receiver:start(self(), [node(), SecondaryNode],
[channel_created, queue_created]),