diff options
Diffstat (limited to 'python/subunit/test_results.py')
| -rw-r--r-- | python/subunit/test_results.py | 149 |
1 files changed, 108 insertions, 41 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index a6ad0c6..e7f9171 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -82,7 +82,7 @@ class TestResultDecorator(object): return self.decorated.stop() def tags(self, new_tags, gone_tags): - return self.decorated.time(new_tags, gone_tags) + return self.decorated.tags(new_tags, gone_tags) def time(self, a_datetime): return self.decorated.time(a_datetime) @@ -195,6 +195,91 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) +class TagCollapsingDecorator(TestResultDecorator): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + # The current test (for filtering tags) + self._current_test = None + # The (new, gone) tags for the current test. + self._current_test_tags = None + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self.decorated.startTest(test) + self._current_test = test + self._current_test_tags = set(), set() + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + # Tags to output for this test. + if self._current_test_tags[0] or self._current_test_tags[1]: + self.decorated.tags(*self._current_test_tags) + self.decorated.stopTest(test) + self._current_test = None + self._current_test_tags = None + + def tags(self, new_tags, gone_tags): + """Handle tag instructions. + + Adds and removes tags as appropriate. If a test is currently running, + tags are not affected for subsequent tests. + + :param new_tags: Tags to add, + :param gone_tags: Tags to remove. + """ + if self._current_test is not None: + # gather the tags until the test stops. + self._current_test_tags[0].update(new_tags) + self._current_test_tags[0].difference_update(gone_tags) + self._current_test_tags[1].update(gone_tags) + self._current_test_tags[1].difference_update(new_tags) + else: + return self.decorated.tags(new_tags, gone_tags) + + +class TimeCollapsingDecorator(HookedTestResultDecorator): + """Only pass on the first and last of a consecutive sequence of times.""" + + def __init__(self, decorated): + super(TimeCollapsingDecorator, self).__init__(decorated) + self._last_received_time = None + self._last_sent_time = None + + def _before_event(self): + if self._last_received_time is None: + return + if self._last_received_time != self._last_sent_time: + self.decorated.time(self._last_received_time) + self._last_sent_time = self._last_received_time + self._last_received_time = None + + def time(self, a_time): + # Don't upcall, because we don't want to call _before_event, it's only + # for non-time events. + if self._last_received_time is None: + self.decorated.time(a_time) + self._last_sent_time = a_time + self._last_received_time = a_time + + +def all_true(bools): + """Return True if all of 'bools' are True. False otherwise.""" + for b in bools: + if not b: + return False + return True + + class TestResultFilter(TestResultDecorator): """A pyunit TestResult interface implementation which filters tests. @@ -222,50 +307,53 @@ class TestResultFilter(TestResultDecorator): metadata is available. outcome is the name of the outcome such as 'success' or 'failure'. """ - TestResultDecorator.__init__(self, result) - self._filter_error = filter_error - self._filter_failure = filter_failure - self._filter_success = filter_success - self._filter_skip = filter_skip - if filter_predicate is None: - filter_predicate = lambda test, outcome, err, details: True - self.filter_predicate = filter_predicate + super(TestResultFilter, self).__init__(result) + self.decorated = TimeCollapsingDecorator( + TagCollapsingDecorator(self.decorated)) + predicates = [] + if filter_error: + predicates.append(lambda t, outcome, e, d: outcome != 'error') + if filter_failure: + predicates.append(lambda t, outcome, e, d: outcome != 'failure') + if filter_success: + predicates.append(lambda t, outcome, e, d: outcome != 'success') + if filter_skip: + predicates.append(lambda t, outcome, e, d: outcome != 'skip') + if filter_predicate is not None: + predicates.append(filter_predicate) + self.filter_predicate = ( + lambda test, outcome, err, details: + all_true(p(test, outcome, err, details) for p in predicates)) # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None - # The (new, gone) tags for the current test. - self._current_test_tags = None # Calls to this result that we don't know whether to forward on yet. self._buffered_calls = [] def addError(self, test, err=None, details=None): - if (not self._filter_error and - self.filter_predicate(test, 'error', err, details)): + if (self.filter_predicate(test, 'error', err, details)): self._buffered_calls.append( ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): - if (not self._filter_failure and - self.filter_predicate(test, 'failure', err, details)): + if (self.filter_predicate(test, 'failure', err, details)): self._buffered_calls.append( ('addFailure', [test, err], {'details': details})) else: self._filtered() def addSkip(self, test, reason=None, details=None): - if (not self._filter_skip and - self.filter_predicate(test, 'skip', reason, details)): + if (self.filter_predicate(test, 'skip', reason, details)): self._buffered_calls.append( ('addSkip', [reason], {'details': details})) else: self._filtered() def addSuccess(self, test, details=None): - if (not self._filter_success and - self.filter_predicate(test, 'success', None, details)): + if (self.filter_predicate(test, 'success', None, details)): self._buffered_calls.append( ('addSuccess', [test], {'details': details})) else: @@ -293,7 +381,6 @@ class TestResultFilter(TestResultDecorator): """ self._current_test = test self._current_test_filtered = False - self._current_test_tags = set(), set() self._buffered_calls.append(('startTest', [test], {})) def stopTest(self, test): @@ -306,31 +393,11 @@ class TestResultFilter(TestResultDecorator): # Tags to output for this test. for method, args, kwargs in self._buffered_calls: getattr(self.decorated, method)(*args, **kwargs) - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None - self._current_test_tags = None self._buffered_calls = [] - def tags(self, new_tags, gone_tags): - """Handle tag instructions. - - Adds and removes tags as appropriate. If a test is currently running, - tags are not affected for subsequent tests. - - :param new_tags: Tags to add, - :param gone_tags: Tags to remove. - """ - if self._current_test is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - return self.decorated.tags(new_tags, gone_tags) - def time(self, a_time): if self._current_test is not None: self._buffered_calls.append(('time', [a_time], {})) @@ -347,7 +414,7 @@ class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): """Create a FilterResult object outputting to stream.""" - testtools.TestResult.__init__(self) + super(TestIdPrintingResult, self).__init__() self._stream = stream self.failed_tests = 0 self.__time = 0 |
