summaryrefslogtreecommitdiff
path: root/python/subunit
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2009-10-25 16:49:09 +1100
committerRobert Collins <robertc@robertcollins.net>2009-10-25 16:49:09 +1100
commitf6b397940103643565955391e73099bf54de6dfb (patch)
treec8f117d8a63487d916b76b24703150437486e8b7 /python/subunit
parent761e21ba507fa0d40509301b1f470944c8a4deb0 (diff)
downloadsubunit-git-f6b397940103643565955391e73099bf54de6dfb.tar.gz
Support the extended TestResult details API on TestResultFilter (but not yet on predicates).
Diffstat (limited to 'python/subunit')
-rw-r--r--python/subunit/__init__.py120
-rw-r--r--python/subunit/test_results.py118
-rw-r--r--python/subunit/tests/test_subunit_filter.py42
3 files changed, 138 insertions, 142 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index c79ee0c..dd3c0f1 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -1095,123 +1095,3 @@ class TestResultStats(unittest.TestResult):
def wasSuccessful(self):
"""Tells whether or not this result was a success"""
return self.failed_tests == 0
-
-
-class TestResultFilter(unittest.TestResult):
- """A pyunit TestResult interface implementation which filters tests.
-
- Tests that pass the filter are handed on to another TestResult instance
- for further processing/reporting. To obtain the filtered results,
- the other instance must be interrogated.
-
- :ivar result: The result that tests are passed to after filtering.
- :ivar filter_predicate: The callback run to decide whether to pass
- a result.
- """
-
- def __init__(self, result, filter_error=False, filter_failure=False,
- filter_success=True, filter_skip=False,
- filter_predicate=None):
- """Create a FilterResult object filtering to result.
-
- :param filter_error: Filter out errors.
- :param filter_failure: Filter out failures.
- :param filter_success: Filter out successful tests.
- :param filter_skip: Filter out skipped tests.
- :param filter_predicate: A callable taking (test, err) and
- returning True if the result should be passed through.
- err is None for success.
- """
- unittest.TestResult.__init__(self)
- self.result = result
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- if filter_predicate is None:
- filter_predicate = lambda test, err: True
- self.filter_predicate = filter_predicate
- # The current test (for filtering tags)
- self._current_test = None
- # Has the current test been filtered (for outputting test tags)
- self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
-
- def addError(self, test, err):
- if not self._filter_error and self.filter_predicate(test, err):
- self.result.startTest(test)
- self.result.addError(test, err)
-
- def addFailure(self, test, err):
- if not self._filter_failure and self.filter_predicate(test, err):
- self.result.startTest(test)
- self.result.addFailure(test, err)
-
- def addSkip(self, test, reason):
- if not self._filter_skip and self.filter_predicate(test, reason):
- self.result.startTest(test)
- # This is duplicated, it would be nice to have on a 'calls
- # TestResults' mixin perhaps.
- addSkip = getattr(self.result, 'addSkip', None)
- if not callable(addSkip):
- self.result.addError(test, RemoteError(reason))
- else:
- self.result.addSkip(test, reason)
-
- def addSuccess(self, test):
- if not self._filter_success and self.filter_predicate(test, None):
- self.result.startTest(test)
- self.result.addSuccess(test)
-
- def startTest(self, test):
- """Start a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- self._current_test = test
- self._current_test_filtered = False
- self._current_test_tags = set(), set()
-
- def stopTest(self, test):
- """Stop a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- if not self._current_test_filtered:
- # Tags to output for this test.
- if self._current_test_tags[0] or self._current_test_tags[1]:
- tags_method = getattr(self.result, 'tags', None)
- if callable(tags_method):
- self.result.tags(*self._current_test_tags)
- self.result.stopTest(test)
- self._current_test = None
- self._current_test_filtered = None
- self._current_test_tags = None
-
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
- if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- tags_method = getattr(self.result, 'tags', None)
- if tags_method is None:
- return
- return tags_method(new_tags, gone_tags)
-
- def id_to_orig_id(self, id):
- if id.startswith("subunit.RemotedTestCase."):
- return id[len("subunit.RemotedTestCase."):]
- return id
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index e0891a3..5401c32 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -196,6 +196,124 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
+class TestResultFilter(TestResultDecorator):
+ """A pyunit TestResult interface implementation which filters tests.
+
+ Tests that pass the filter are handed on to another TestResult instance
+ for further processing/reporting. To obtain the filtered results,
+ the other instance must be interrogated.
+
+ :ivar result: The result that tests are passed to after filtering.
+ :ivar filter_predicate: The callback run to decide whether to pass
+ a result.
+ """
+
+ def __init__(self, result, filter_error=False, filter_failure=False,
+ filter_success=True, filter_skip=False,
+ filter_predicate=None):
+ """Create a FilterResult object filtering to result.
+
+ :param filter_error: Filter out errors.
+ :param filter_failure: Filter out failures.
+ :param filter_success: Filter out successful tests.
+ :param filter_skip: Filter out skipped tests.
+ :param filter_predicate: A callable taking (test, err) and
+ returning True if the result should be passed through.
+ err is None for success.
+ """
+ TestResultDecorator.__init__(self, result)
+ self._filter_error = filter_error
+ self._filter_failure = filter_failure
+ self._filter_success = filter_success
+ self._filter_skip = filter_skip
+ if filter_predicate is None:
+ filter_predicate = lambda test, err: True
+ self.filter_predicate = filter_predicate
+ # The current test (for filtering tags)
+ self._current_test = None
+ # Has the current test been filtered (for outputting test tags)
+ self._current_test_filtered = None
+ # The (new, gone) tags for the current test.
+ self._current_test_tags = None
+
+ def addError(self, test, err, details=None):
+ if not self._filter_error and self.filter_predicate(test, err):
+ self.decorated.startTest(test)
+ self.decorated.addError(test, err, details=details)
+
+ def addFailure(self, test, err, details=None):
+ if not self._filter_failure and self.filter_predicate(test, err):
+ self.decorated.startTest(test)
+ self.decorated.addFailure(test, err, details=details)
+
+ def addSkip(self, test, reason, details=None):
+ if not self._filter_skip and self.filter_predicate(test, reason):
+ self.decorated.startTest(test)
+ self.decorated.addSkip(test, reason, details=details)
+
+ def addSuccess(self, test, details=None):
+ if not self._filter_success and self.filter_predicate(test, None):
+ self.decorated.startTest(test)
+ self.decorated.addSuccess(test, details=details)
+
+ def addExpectedFailure(self, test, err, details=None):
+ if self.filter_predicate(test, err):
+ self.decorated.startTest(test)
+ return self.decorated.addExpectedFailure(test, err,
+ details=details)
+
+ def addUnexpectedSuccess(self, test, details=None):
+ self.decorated.startTest(test)
+ return self.decorated.addUnexpectedSuccess(test, details=details)
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self._current_test = test
+ self._current_test_filtered = False
+ self._current_test_tags = set(), set()
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ if not self._current_test_filtered:
+ # Tags to output for this test.
+ if self._current_test_tags[0] or self._current_test_tags[1]:
+ self.decorated.tags(*self._current_test_tags)
+ self.decorated.stopTest(test)
+ self._current_test = None
+ self._current_test_filtered = None
+ self._current_test_tags = None
+
+ def tags(self, new_tags, gone_tags):
+ """Handle tag instructions.
+
+ Adds and removes tags as appropriate. If a test is currently running,
+ tags are not affected for subsequent tests.
+
+ :param new_tags: Tags to add,
+ :param gone_tags: Tags to remove.
+ """
+ if self._current_test is not None:
+ # gather the tags until the test stops.
+ self._current_test_tags[0].update(new_tags)
+ self._current_test_tags[0].difference_update(gone_tags)
+ self._current_test_tags[1].update(gone_tags)
+ self._current_test_tags[1].difference_update(new_tags)
+ return self.decorated.tags(new_tags, gone_tags)
+
+ def id_to_orig_id(self, id):
+ if id.startswith("subunit.RemotedTestCase."):
+ return id[len("subunit.RemotedTestCase."):]
+ return id
+
+
class ExtendedToOriginalDecorator(object):
"""Permit new TestResult API code to degrade gracefully with old results.
diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py
index cc13b6c..9baff24 100644
--- a/python/subunit/tests/test_subunit_filter.py
+++ b/python/subunit/tests/test_subunit_filter.py
@@ -20,6 +20,7 @@ import unittest
from StringIO import StringIO
import subunit
+from subunit.test_results import TestResultFilter
class TestTestResultFilter(unittest.TestCase):
@@ -31,57 +32,56 @@ class TestTestResultFilter(unittest.TestCase):
def test_default(self):
"""The default is to exclude success and include everything else."""
self.filtered_result = unittest.TestResult()
- self.filter = subunit.TestResultFilter(self.filtered_result)
+ self.filter = TestResultFilter(self.filtered_result)
self.run_tests()
- # skips are seen as errors by default python TestResult.
- self.assertEqual(['error', 'skipped'],
+ # skips are seen as success by default python TestResult.
+ self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
- self.assertEqual(3, self.filtered_result.testsRun)
+ self.assertEqual(4, self.filtered_result.testsRun)
def test_exclude_errors(self):
self.filtered_result = unittest.TestResult()
- self.filter = subunit.TestResultFilter(self.filtered_result,
+ self.filter = TestResultFilter(self.filtered_result,
filter_error=True)
self.run_tests()
# skips are seen as errors by default python TestResult.
- self.assertEqual(['skipped'],
- [error[0].id() for error in self.filtered_result.errors])
+ self.assertEqual([], self.filtered_result.errors)
self.assertEqual(['failed'],
[failure[0].id() for failure in
self.filtered_result.failures])
- self.assertEqual(2, self.filtered_result.testsRun)
+ self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_failure(self):
self.filtered_result = unittest.TestResult()
- self.filter = subunit.TestResultFilter(self.filtered_result,
+ self.filter = TestResultFilter(self.filtered_result,
filter_failure=True)
self.run_tests()
- self.assertEqual(['error', 'skipped'],
+ self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual([],
[failure[0].id() for failure in
self.filtered_result.failures])
- self.assertEqual(2, self.filtered_result.testsRun)
+ self.assertEqual(3, self.filtered_result.testsRun)
def test_exclude_skips(self):
self.filtered_result = subunit.TestResultStats(None)
- self.filter = subunit.TestResultFilter(self.filtered_result,
+ self.filter = TestResultFilter(self.filtered_result,
filter_skip=True)
self.run_tests()
self.assertEqual(0, self.filtered_result.skipped_tests)
self.assertEqual(2, self.filtered_result.failed_tests)
- self.assertEqual(2, self.filtered_result.testsRun)
+ self.assertEqual(3, self.filtered_result.testsRun)
def test_include_success(self):
"""Success's can be included if requested."""
self.filtered_result = unittest.TestResult()
- self.filter = subunit.TestResultFilter(self.filtered_result,
+ self.filter = TestResultFilter(self.filtered_result,
filter_success=False)
self.run_tests()
- self.assertEqual(['error', 'skipped'],
+ self.assertEqual(['error'],
[error[0].id() for error in self.filtered_result.errors])
self.assertEqual(['failed'],
[failure[0].id() for failure in
@@ -92,14 +92,11 @@ class TestTestResultFilter(unittest.TestCase):
"""You can filter by predicate callbacks"""
self.filtered_result = unittest.TestResult()
filter_cb = lambda test, err: str(err).find('error details') != -1
- self.filter = subunit.TestResultFilter(self.filtered_result,
+ self.filter = TestResultFilter(self.filtered_result,
filter_predicate=filter_cb,
filter_success=False)
self.run_tests()
- self.assertEqual(1,
- self.filtered_result.testsRun)
- # I'd like to test filtering the xfail but it's blocked by
- # https://bugs.edge.launchpad.net/subunit/+bug/409193 -- mbp 20090805
+ self.assertEqual(1, self.filtered_result.testsRun)
def run_tests(self):
self.setUpTestStream()
@@ -109,8 +106,9 @@ class TestTestResultFilter(unittest.TestCase):
def setUpTestStream(self):
# While TestResultFilter works on python objects, using a subunit
# stream is an easy pithy way of getting a series of test objects to
- # call into the TestResult, and as TestResultFilter is intended for use
- # with subunit also has the benefit of detecting any interface skew issues.
+ # call into the TestResult, and as TestResultFilter is intended for
+ # use with subunit also has the benefit of detecting any interface
+ # skew issues.
self.input_stream = StringIO()
self.input_stream.write("""tags: global
test passed