diff options
| author | Jonathan Lange <jml@mumak.net> | 2012-04-10 13:56:05 +0100 |
|---|---|---|
| committer | Jonathan Lange <jml@mumak.net> | 2012-04-10 13:56:05 +0100 |
| commit | 229151e1f399cefadf843a41ee08791fa1aa451a (patch) | |
| tree | a22e5369eca4da2af97d5d7702338159d9f485bd /python | |
| parent | eff5dc6d9da686ebbb2de9444c5bdc46e5a0539f (diff) | |
| download | subunit-git-229151e1f399cefadf843a41ee08791fa1aa451a.tar.gz | |
Extract out a filter base class that just deals with predicates.
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/test_results.py | 164 |
1 files changed, 96 insertions, 68 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index fb7affd..32dff6a 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -286,7 +286,100 @@ def all_true(bools): return True -class TestResultFilter(TestResultDecorator): +class _PredicateFilter(TestResultDecorator): + + def __init__(self, result, predicate): + super(_PredicateFilter, self).__init__(result) + self.decorated = TimeCollapsingDecorator( + TagCollapsingDecorator(self.decorated)) + self.filter_predicate = predicate + # The current test (for filtering tags) + self._current_test = None + # Has the current test been filtered (for outputting test tags) + self._current_test_filtered = None + # Calls to this result that we don't know whether to forward on yet. + self._buffered_calls = [] + + def addError(self, test, err=None, details=None): + if (self.filter_predicate(test, 'error', err, details)): + self._buffered_calls.append( + ('addError', [test, err], {'details': details})) + else: + self._filtered() + + def addFailure(self, test, err=None, details=None): + if (self.filter_predicate(test, 'failure', err, details)): + self._buffered_calls.append( + ('addFailure', [test, err], {'details': details})) + else: + self._filtered() + + def addSkip(self, test, reason=None, details=None): + if (self.filter_predicate(test, 'skip', reason, details)): + self._buffered_calls.append( + ('addSkip', [test, reason], {'details': details})) + else: + self._filtered() + + def addExpectedFailure(self, test, err=None, details=None): + if self.filter_predicate(test, 'expectedfailure', err, details): + self._buffered_calls.append( + ('addExpectedFailure', [test, err], {'details': details})) + else: + self._filtered() + + def addUnexpectedSuccess(self, test, details=None): + self._buffered_calls.append( + ('addUnexpectedSuccess', [test], {'details': details})) + + def addSuccess(self, test, details=None): + if (self.filter_predicate(test, 'success', None, details)): + self._buffered_calls.append( + ('addSuccess', [test], {'details': details})) + else: + self._filtered() + + def _filtered(self): + self._current_test_filtered = True + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self._current_test = test + self._current_test_filtered = False + self._buffered_calls.append(('startTest', [test], {})) + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + if not self._current_test_filtered: + # Tags to output for this test. + for method, args, kwargs in self._buffered_calls: + getattr(self.decorated, method)(*args, **kwargs) + self.decorated.stopTest(test) + self._current_test = None + self._current_test_filtered = None + self._buffered_calls = [] + + def time(self, a_time): + if self._current_test is not None: + self._buffered_calls.append(('time', [a_time], {})) + else: + return self.decorated.time(a_time) + + def id_to_orig_id(self, id): + if id.startswith("subunit.RemotedTestCase."): + return id[len("subunit.RemotedTestCase."):] + return id + + +class TestResultFilter(_PredicateFilter): """A pyunit TestResult interface implementation which filters tests. Tests that pass the filter are handed on to another TestResult instance @@ -316,9 +409,6 @@ class TestResultFilter(TestResultDecorator): :param fixup_expected_failures: Set of test ids to consider known failing. """ - super(TestResultFilter, self).__init__(result) - self.decorated = TimeCollapsingDecorator( - TagCollapsingDecorator(self.decorated)) predicates = [] if filter_error: predicates.append(lambda t, outcome, e, d: outcome != 'error') @@ -332,15 +422,10 @@ class TestResultFilter(TestResultDecorator): predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure') if filter_predicate is not None: predicates.append(filter_predicate) - self.filter_predicate = ( + predicate = ( lambda test, outcome, err, details: all_true(p(test, outcome, err, details) for p in predicates)) - # The current test (for filtering tags) - self._current_test = None - # Has the current test been filtered (for outputting test tags) - self._current_test_filtered = None - # Calls to this result that we don't know whether to forward on yet. - self._buffered_calls = [] + super(TestResultFilter, self).__init__(result, predicate) if fixup_expected_failures is None: self._fixup_expected_failures = frozenset() else: @@ -368,13 +453,6 @@ class TestResultFilter(TestResultDecorator): else: self._filtered() - def addSkip(self, test, reason=None, details=None): - if (self.filter_predicate(test, 'skip', reason, details)): - self._buffered_calls.append( - ('addSkip', [test, reason], {'details': details})) - else: - self._filtered() - def addSuccess(self, test, details=None): if (self.filter_predicate(test, 'success', None, details)): if self._failure_expected(test): @@ -386,59 +464,9 @@ class TestResultFilter(TestResultDecorator): else: self._filtered() - def addExpectedFailure(self, test, err=None, details=None): - if self.filter_predicate(test, 'expectedfailure', err, details): - self._buffered_calls.append( - ('addExpectedFailure', [test, err], {'details': details})) - else: - self._filtered() - - def addUnexpectedSuccess(self, test, details=None): - self._buffered_calls.append( - ('addUnexpectedSuccess', [test], {'details': details})) - - def _filtered(self): - self._current_test_filtered = True - def _failure_expected(self, test): return (test.id() in self._fixup_expected_failures) - def startTest(self, test): - """Start a test. - - Not directly passed to the client, but used for handling of tags - correctly. - """ - self._current_test = test - self._current_test_filtered = False - self._buffered_calls.append(('startTest', [test], {})) - - def stopTest(self, test): - """Stop a test. - - Not directly passed to the client, but used for handling of tags - correctly. - """ - if not self._current_test_filtered: - # Tags to output for this test. - for method, args, kwargs in self._buffered_calls: - getattr(self.decorated, method)(*args, **kwargs) - self.decorated.stopTest(test) - self._current_test = None - self._current_test_filtered = None - self._buffered_calls = [] - - def time(self, a_time): - if self._current_test is not None: - self._buffered_calls.append(('time', [a_time], {})) - else: - return self.decorated.time(a_time) - - def id_to_orig_id(self, id): - if id.startswith("subunit.RemotedTestCase."): - return id[len("subunit.RemotedTestCase."):] - return id - class TestIdPrintingResult(testtools.TestResult): |
