summaryrefslogtreecommitdiff
path: root/python/subunit/tests
diff options
context:
space:
mode:
Diffstat (limited to 'python/subunit/tests')
-rw-r--r--python/subunit/tests/test_subunit_filter.py137
1 files changed, 134 insertions, 3 deletions
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index 123be08..664fcb3 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -17,15 +17,18 @@
"""Tests for subunit.TestResultFilter."""
from datetime import datetime
+import os
+import subprocess
+import sys
from subunit import iso8601
import unittest
from testtools import TestCase
-from testtools.compat import _b, BytesIO, StringIO
+from testtools.compat import _b, BytesIO
from testtools.testresult.doubles import ExtendedTestResult
import subunit
-from subunit.test_results import TestResultFilter
+from subunit.test_results import _make_tag_filter, TestResultFilter
class TestTestResultFilter(TestCase):
@@ -77,6 +80,40 @@ xfail todo
filtered_result.failures])
self.assertEqual(4, filtered_result.testsRun)
+ def test_tag_filter(self):
+ tag_filter = _make_tag_filter(['global'], ['local'])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ self.run_tests(result_filter)
+ tests_included = [
+ event[1] for event in result._events if event[0] == 'startTest']
+ tests_expected = map(
+ subunit.RemotedTestCase,
+ ['passed', 'error', 'skipped', 'todo'])
+ self.assertEquals(tests_expected, tests_included)
+
+ def test_tags_tracked_correctly(self):
+ tag_filter = _make_tag_filter(['a'], [])
+ result = ExtendedTestResult()
+ result_filter = TestResultFilter(
+ result, filter_success=False, filter_predicate=tag_filter)
+ input_stream = (
+ "test: foo\n"
+ "tags: a\n"
+ "successful: foo\n"
+ "test: bar\n"
+ "successful: bar\n")
+ self.run_tests(result_filter, input_stream)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEquals(
+ [('startTest', foo),
+ ('tags', set(['a']), set()),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ],
+ result._events)
+
def test_exclude_errors(self):
filtered_result = unittest.TestResult()
result_filter = TestResultFilter(filtered_result, filter_error=True)
@@ -151,6 +188,8 @@ xfail todo
def test_filter_predicate(self):
"""You can filter by predicate callbacks"""
+ # 0.0.7 and earlier did not support the 'tags' parameter, so we need
+ # to test that we still support behaviour without it.
filtered_result = unittest.TestResult()
def filter_cb(test, outcome, err, details):
return outcome == 'success'
@@ -161,6 +200,18 @@ xfail todo
# Only success should pass
self.assertEqual(1, filtered_result.testsRun)
+ def test_filter_predicate_with_tags(self):
+ """You can filter by predicate callbacks that accept tags"""
+ filtered_result = unittest.TestResult()
+ def filter_cb(test, outcome, err, details, tags):
+ return outcome == 'success'
+ result_filter = TestResultFilter(filtered_result,
+ filter_predicate=filter_cb,
+ filter_success=False)
+ self.run_tests(result_filter)
+ # Only success should pass
+ self.assertEqual(1, filtered_result.testsRun)
+
def test_time_ordering_preserved(self):
# Passing a subunit stream through TestResultFilter preserves the
# relative ordering of 'time' directives and any other subunit
@@ -182,8 +233,8 @@ xfail todo
self.maxDiff = None
self.assertSequenceEqual(
[('time', date_a),
- ('time', date_b),
('startTest', foo),
+ ('time', date_b),
('addError', foo, {}),
('stopTest', foo),
('time', date_c)], result._events)
@@ -203,6 +254,86 @@ xfail todo
('stopTest', foo), ], result._events)
+class TestFilterCommand(TestCase):
+
+ example_subunit_stream = _b("""\
+tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error [
+error details
+]
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+
+ def run_command(self, args, stream):
+ root = os.path.dirname(
+ os.path.dirname(os.path.dirname(os.path.dirname(__file__))))
+ script_path = os.path.join(root, 'filters', 'subunit-filter')
+ command = [sys.executable, script_path] + list(args)
+ ps = subprocess.Popen(
+ command, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = ps.communicate(stream)
+ if ps.returncode != 0:
+ raise RuntimeError("%s failed: %s" % (command, err))
+ return out
+
+ def to_events(self, stream):
+ test = subunit.ProtocolTestCase(BytesIO(stream))
+ result = ExtendedTestResult()
+ test.run(result)
+ return result._events
+
+ def test_default(self):
+ output = self.run_command([], (
+ "test: foo\n"
+ "skip: foo\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ self.assertEqual(
+ [('startTest', foo),
+ ('addSkip', foo, {}),
+ ('stopTest', foo)],
+ events)
+
+ def test_tags(self):
+ output = self.run_command(['-s', '--with-tag', 'a'], (
+ "tags: a\n"
+ "test: foo\n"
+ "success: foo\n"
+ "tags: -a\n"
+ "test: bar\n"
+ "success: bar\n"
+ "test: baz\n"
+ "tags: a\n"
+ "success: baz\n"
+ ))
+ events = self.to_events(output)
+ foo = subunit.RemotedTestCase('foo')
+ baz = subunit.RemotedTestCase('baz')
+ self.assertEqual(
+ [('tags', set(['a']), set()),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo),
+ ('tags', set(), set(['a'])),
+ ('startTest', baz),
+ ('tags', set(['a']), set()),
+ ('addSuccess', baz),
+ ('stopTest', baz),
+ ],
+ events)
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)