summaryrefslogtreecommitdiff
path: root/python/subunit/test_results.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/subunit/test_results.py')
-rw-r--r--python/subunit/test_results.py149
1 files changed, 108 insertions, 41 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index a6ad0c6..e7f9171 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -82,7 +82,7 @@ class TestResultDecorator(object):
return self.decorated.stop()
def tags(self, new_tags, gone_tags):
- return self.decorated.time(new_tags, gone_tags)
+ return self.decorated.tags(new_tags, gone_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
@@ -195,6 +195,91 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
+class TagCollapsingDecorator(TestResultDecorator):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ # The current test (for filtering tags)
+ self._current_test = None
+ # The (new, gone) tags for the current test.
+ self._current_test_tags = None
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self.decorated.startTest(test)
+ self._current_test = test
+ self._current_test_tags = set(), set()
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ # Tags to output for this test.
+ if self._current_test_tags[0] or self._current_test_tags[1]:
+ self.decorated.tags(*self._current_test_tags)
+ self.decorated.stopTest(test)
+ self._current_test = None
+ self._current_test_tags = None
+
+ def tags(self, new_tags, gone_tags):
+ """Handle tag instructions.
+
+ Adds and removes tags as appropriate. If a test is currently running,
+ tags are not affected for subsequent tests.
+
+ :param new_tags: Tags to add,
+ :param gone_tags: Tags to remove.
+ """
+ if self._current_test is not None:
+ # gather the tags until the test stops.
+ self._current_test_tags[0].update(new_tags)
+ self._current_test_tags[0].difference_update(gone_tags)
+ self._current_test_tags[1].update(gone_tags)
+ self._current_test_tags[1].difference_update(new_tags)
+ else:
+ return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+ """Only pass on the first and last of a consecutive sequence of times."""
+
+ def __init__(self, decorated):
+ super(TimeCollapsingDecorator, self).__init__(decorated)
+ self._last_received_time = None
+ self._last_sent_time = None
+
+ def _before_event(self):
+ if self._last_received_time is None:
+ return
+ if self._last_received_time != self._last_sent_time:
+ self.decorated.time(self._last_received_time)
+ self._last_sent_time = self._last_received_time
+ self._last_received_time = None
+
+ def time(self, a_time):
+ # Don't upcall, because we don't want to call _before_event, it's only
+ # for non-time events.
+ if self._last_received_time is None:
+ self.decorated.time(a_time)
+ self._last_sent_time = a_time
+ self._last_received_time = a_time
+
+
+def all_true(bools):
+ """Return True if all of 'bools' are True. False otherwise."""
+ for b in bools:
+ if not b:
+ return False
+ return True
+
+
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
@@ -222,50 +307,53 @@ class TestResultFilter(TestResultDecorator):
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
"""
- TestResultDecorator.__init__(self, result)
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- if filter_predicate is None:
- filter_predicate = lambda test, outcome, err, details: True
- self.filter_predicate = filter_predicate
+ super(TestResultFilter, self).__init__(result)
+ self.decorated = TimeCollapsingDecorator(
+ TagCollapsingDecorator(self.decorated))
+ predicates = []
+ if filter_error:
+ predicates.append(lambda t, outcome, e, d: outcome != 'error')
+ if filter_failure:
+ predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+ if filter_success:
+ predicates.append(lambda t, outcome, e, d: outcome != 'success')
+ if filter_skip:
+ predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+ if filter_predicate is not None:
+ predicates.append(filter_predicate)
+ self.filter_predicate = (
+ lambda test, outcome, err, details:
+ all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
def addError(self, test, err=None, details=None):
- if (not self._filter_error and
- self.filter_predicate(test, 'error', err, details)):
+ if (self.filter_predicate(test, 'error', err, details)):
self._buffered_calls.append(
('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
- if (not self._filter_failure and
- self.filter_predicate(test, 'failure', err, details)):
+ if (self.filter_predicate(test, 'failure', err, details)):
self._buffered_calls.append(
('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
- if (not self._filter_skip and
- self.filter_predicate(test, 'skip', reason, details)):
+ if (self.filter_predicate(test, 'skip', reason, details)):
self._buffered_calls.append(
('addSkip', [reason], {'details': details}))
else:
self._filtered()
def addSuccess(self, test, details=None):
- if (not self._filter_success and
- self.filter_predicate(test, 'success', None, details)):
+ if (self.filter_predicate(test, 'success', None, details)):
self._buffered_calls.append(
('addSuccess', [test], {'details': details}))
else:
@@ -293,7 +381,6 @@ class TestResultFilter(TestResultDecorator):
"""
self._current_test = test
self._current_test_filtered = False
- self._current_test_tags = set(), set()
self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
@@ -306,31 +393,11 @@ class TestResultFilter(TestResultDecorator):
# Tags to output for this test.
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
- if self._current_test_tags[0] or self._current_test_tags[1]:
- self.decorated.tags(*self._current_test_tags)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
- self._current_test_tags = None
self._buffered_calls = []
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
- if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- return self.decorated.tags(new_tags, gone_tags)
-
def time(self, a_time):
if self._current_test is not None:
self._buffered_calls.append(('time', [a_time], {}))
@@ -347,7 +414,7 @@ class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
"""Create a FilterResult object outputting to stream."""
- testtools.TestResult.__init__(self)
+ super(TestIdPrintingResult, self).__init__()
self._stream = stream
self.failed_tests = 0
self.__time = 0