diff options
| author | Robert Collins <robertc@robertcollins.net> | 2009-10-25 16:49:09 +1100 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2009-10-25 16:49:09 +1100 |
| commit | f6b397940103643565955391e73099bf54de6dfb (patch) | |
| tree | c8f117d8a63487d916b76b24703150437486e8b7 /python/subunit | |
| parent | 761e21ba507fa0d40509301b1f470944c8a4deb0 (diff) | |
| download | subunit-git-f6b397940103643565955391e73099bf54de6dfb.tar.gz | |
Support the extended TestResult details API on TestResultFilter (but not yet on predicates).
Diffstat (limited to 'python/subunit')
| -rw-r--r-- | python/subunit/__init__.py | 120 | ||||
| -rw-r--r-- | python/subunit/test_results.py | 118 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_filter.py | 42 |
3 files changed, 138 insertions, 142 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index c79ee0c..dd3c0f1 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -1095,123 +1095,3 @@ class TestResultStats(unittest.TestResult): def wasSuccessful(self): """Tells whether or not this result was a success""" return self.failed_tests == 0 - - -class TestResultFilter(unittest.TestResult): - """A pyunit TestResult interface implementation which filters tests. - - Tests that pass the filter are handed on to another TestResult instance - for further processing/reporting. To obtain the filtered results, - the other instance must be interrogated. - - :ivar result: The result that tests are passed to after filtering. - :ivar filter_predicate: The callback run to decide whether to pass - a result. - """ - - def __init__(self, result, filter_error=False, filter_failure=False, - filter_success=True, filter_skip=False, - filter_predicate=None): - """Create a FilterResult object filtering to result. - - :param filter_error: Filter out errors. - :param filter_failure: Filter out failures. - :param filter_success: Filter out successful tests. - :param filter_skip: Filter out skipped tests. - :param filter_predicate: A callable taking (test, err) and - returning True if the result should be passed through. - err is None for success. - """ - unittest.TestResult.__init__(self) - self.result = result - self._filter_error = filter_error - self._filter_failure = filter_failure - self._filter_success = filter_success - self._filter_skip = filter_skip - if filter_predicate is None: - filter_predicate = lambda test, err: True - self.filter_predicate = filter_predicate - # The current test (for filtering tags) - self._current_test = None - # Has the current test been filtered (for outputting test tags) - self._current_test_filtered = None - # The (new, gone) tags for the current test. - self._current_test_tags = None - - def addError(self, test, err): - if not self._filter_error and self.filter_predicate(test, err): - self.result.startTest(test) - self.result.addError(test, err) - - def addFailure(self, test, err): - if not self._filter_failure and self.filter_predicate(test, err): - self.result.startTest(test) - self.result.addFailure(test, err) - - def addSkip(self, test, reason): - if not self._filter_skip and self.filter_predicate(test, reason): - self.result.startTest(test) - # This is duplicated, it would be nice to have on a 'calls - # TestResults' mixin perhaps. - addSkip = getattr(self.result, 'addSkip', None) - if not callable(addSkip): - self.result.addError(test, RemoteError(reason)) - else: - self.result.addSkip(test, reason) - - def addSuccess(self, test): - if not self._filter_success and self.filter_predicate(test, None): - self.result.startTest(test) - self.result.addSuccess(test) - - def startTest(self, test): - """Start a test. - - Not directly passed to the client, but used for handling of tags - correctly. - """ - self._current_test = test - self._current_test_filtered = False - self._current_test_tags = set(), set() - - def stopTest(self, test): - """Stop a test. - - Not directly passed to the client, but used for handling of tags - correctly. - """ - if not self._current_test_filtered: - # Tags to output for this test. - if self._current_test_tags[0] or self._current_test_tags[1]: - tags_method = getattr(self.result, 'tags', None) - if callable(tags_method): - self.result.tags(*self._current_test_tags) - self.result.stopTest(test) - self._current_test = None - self._current_test_filtered = None - self._current_test_tags = None - - def tags(self, new_tags, gone_tags): - """Handle tag instructions. - - Adds and removes tags as appropriate. If a test is currently running, - tags are not affected for subsequent tests. - - :param new_tags: Tags to add, - :param gone_tags: Tags to remove. - """ - if self._current_test is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - tags_method = getattr(self.result, 'tags', None) - if tags_method is None: - return - return tags_method(new_tags, gone_tags) - - def id_to_orig_id(self, id): - if id.startswith("subunit.RemotedTestCase."): - return id[len("subunit.RemotedTestCase."):] - return id diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index e0891a3..5401c32 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -196,6 +196,124 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) +class TestResultFilter(TestResultDecorator): + """A pyunit TestResult interface implementation which filters tests. + + Tests that pass the filter are handed on to another TestResult instance + for further processing/reporting. To obtain the filtered results, + the other instance must be interrogated. + + :ivar result: The result that tests are passed to after filtering. + :ivar filter_predicate: The callback run to decide whether to pass + a result. + """ + + def __init__(self, result, filter_error=False, filter_failure=False, + filter_success=True, filter_skip=False, + filter_predicate=None): + """Create a FilterResult object filtering to result. + + :param filter_error: Filter out errors. + :param filter_failure: Filter out failures. + :param filter_success: Filter out successful tests. + :param filter_skip: Filter out skipped tests. + :param filter_predicate: A callable taking (test, err) and + returning True if the result should be passed through. + err is None for success. + """ + TestResultDecorator.__init__(self, result) + self._filter_error = filter_error + self._filter_failure = filter_failure + self._filter_success = filter_success + self._filter_skip = filter_skip + if filter_predicate is None: + filter_predicate = lambda test, err: True + self.filter_predicate = filter_predicate + # The current test (for filtering tags) + self._current_test = None + # Has the current test been filtered (for outputting test tags) + self._current_test_filtered = None + # The (new, gone) tags for the current test. + self._current_test_tags = None + + def addError(self, test, err, details=None): + if not self._filter_error and self.filter_predicate(test, err): + self.decorated.startTest(test) + self.decorated.addError(test, err, details=details) + + def addFailure(self, test, err, details=None): + if not self._filter_failure and self.filter_predicate(test, err): + self.decorated.startTest(test) + self.decorated.addFailure(test, err, details=details) + + def addSkip(self, test, reason, details=None): + if not self._filter_skip and self.filter_predicate(test, reason): + self.decorated.startTest(test) + self.decorated.addSkip(test, reason, details=details) + + def addSuccess(self, test, details=None): + if not self._filter_success and self.filter_predicate(test, None): + self.decorated.startTest(test) + self.decorated.addSuccess(test, details=details) + + def addExpectedFailure(self, test, err, details=None): + if self.filter_predicate(test, err): + self.decorated.startTest(test) + return self.decorated.addExpectedFailure(test, err, + details=details) + + def addUnexpectedSuccess(self, test, details=None): + self.decorated.startTest(test) + return self.decorated.addUnexpectedSuccess(test, details=details) + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self._current_test = test + self._current_test_filtered = False + self._current_test_tags = set(), set() + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + if not self._current_test_filtered: + # Tags to output for this test. + if self._current_test_tags[0] or self._current_test_tags[1]: + self.decorated.tags(*self._current_test_tags) + self.decorated.stopTest(test) + self._current_test = None + self._current_test_filtered = None + self._current_test_tags = None + + def tags(self, new_tags, gone_tags): + """Handle tag instructions. + + Adds and removes tags as appropriate. If a test is currently running, + tags are not affected for subsequent tests. + + :param new_tags: Tags to add, + :param gone_tags: Tags to remove. + """ + if self._current_test is not None: + # gather the tags until the test stops. + self._current_test_tags[0].update(new_tags) + self._current_test_tags[0].difference_update(gone_tags) + self._current_test_tags[1].update(gone_tags) + self._current_test_tags[1].difference_update(new_tags) + return self.decorated.tags(new_tags, gone_tags) + + def id_to_orig_id(self, id): + if id.startswith("subunit.RemotedTestCase."): + return id[len("subunit.RemotedTestCase."):] + return id + + class ExtendedToOriginalDecorator(object): """Permit new TestResult API code to degrade gracefully with old results. diff --git a/python/subunit/tests/test_subunit_filter.py b/python/subunit/tests/test_subunit_filter.py index cc13b6c..9baff24 100644 --- a/python/subunit/tests/test_subunit_filter.py +++ b/python/subunit/tests/test_subunit_filter.py @@ -20,6 +20,7 @@ import unittest from StringIO import StringIO import subunit +from subunit.test_results import TestResultFilter class TestTestResultFilter(unittest.TestCase): @@ -31,57 +32,56 @@ class TestTestResultFilter(unittest.TestCase): def test_default(self): """The default is to exclude success and include everything else.""" self.filtered_result = unittest.TestResult() - self.filter = subunit.TestResultFilter(self.filtered_result) + self.filter = TestResultFilter(self.filtered_result) self.run_tests() - # skips are seen as errors by default python TestResult. - self.assertEqual(['error', 'skipped'], + # skips are seen as success by default python TestResult. + self.assertEqual(['error'], [error[0].id() for error in self.filtered_result.errors]) self.assertEqual(['failed'], [failure[0].id() for failure in self.filtered_result.failures]) - self.assertEqual(3, self.filtered_result.testsRun) + self.assertEqual(4, self.filtered_result.testsRun) def test_exclude_errors(self): self.filtered_result = unittest.TestResult() - self.filter = subunit.TestResultFilter(self.filtered_result, + self.filter = TestResultFilter(self.filtered_result, filter_error=True) self.run_tests() # skips are seen as errors by default python TestResult. - self.assertEqual(['skipped'], - [error[0].id() for error in self.filtered_result.errors]) + self.assertEqual([], self.filtered_result.errors) self.assertEqual(['failed'], [failure[0].id() for failure in self.filtered_result.failures]) - self.assertEqual(2, self.filtered_result.testsRun) + self.assertEqual(3, self.filtered_result.testsRun) def test_exclude_failure(self): self.filtered_result = unittest.TestResult() - self.filter = subunit.TestResultFilter(self.filtered_result, + self.filter = TestResultFilter(self.filtered_result, filter_failure=True) self.run_tests() - self.assertEqual(['error', 'skipped'], + self.assertEqual(['error'], [error[0].id() for error in self.filtered_result.errors]) self.assertEqual([], [failure[0].id() for failure in self.filtered_result.failures]) - self.assertEqual(2, self.filtered_result.testsRun) + self.assertEqual(3, self.filtered_result.testsRun) def test_exclude_skips(self): self.filtered_result = subunit.TestResultStats(None) - self.filter = subunit.TestResultFilter(self.filtered_result, + self.filter = TestResultFilter(self.filtered_result, filter_skip=True) self.run_tests() self.assertEqual(0, self.filtered_result.skipped_tests) self.assertEqual(2, self.filtered_result.failed_tests) - self.assertEqual(2, self.filtered_result.testsRun) + self.assertEqual(3, self.filtered_result.testsRun) def test_include_success(self): """Success's can be included if requested.""" self.filtered_result = unittest.TestResult() - self.filter = subunit.TestResultFilter(self.filtered_result, + self.filter = TestResultFilter(self.filtered_result, filter_success=False) self.run_tests() - self.assertEqual(['error', 'skipped'], + self.assertEqual(['error'], [error[0].id() for error in self.filtered_result.errors]) self.assertEqual(['failed'], [failure[0].id() for failure in @@ -92,14 +92,11 @@ class TestTestResultFilter(unittest.TestCase): """You can filter by predicate callbacks""" self.filtered_result = unittest.TestResult() filter_cb = lambda test, err: str(err).find('error details') != -1 - self.filter = subunit.TestResultFilter(self.filtered_result, + self.filter = TestResultFilter(self.filtered_result, filter_predicate=filter_cb, filter_success=False) self.run_tests() - self.assertEqual(1, - self.filtered_result.testsRun) - # I'd like to test filtering the xfail but it's blocked by - # https://bugs.edge.launchpad.net/subunit/+bug/409193 -- mbp 20090805 + self.assertEqual(1, self.filtered_result.testsRun) def run_tests(self): self.setUpTestStream() @@ -109,8 +106,9 @@ class TestTestResultFilter(unittest.TestCase): def setUpTestStream(self): # While TestResultFilter works on python objects, using a subunit # stream is an easy pithy way of getting a series of test objects to - # call into the TestResult, and as TestResultFilter is intended for use - # with subunit also has the benefit of detecting any interface skew issues. + # call into the TestResult, and as TestResultFilter is intended for + # use with subunit also has the benefit of detecting any interface + # skew issues. self.input_stream = StringIO() self.input_stream.write("""tags: global test passed |
