summaryrefslogtreecommitdiff
path: root/python/subunit/test_results.py
diff options
context:
space:
mode:
authorJohn Arbash Meinel <john@arbash-meinel.com>2011-06-30 12:59:08 +0200
committerJohn Arbash Meinel <john@arbash-meinel.com>2011-06-30 12:59:08 +0200
commit3a41c806338121e7597c52b8e6e79302e67d8008 (patch)
treedf4154aad123123069b2730846aa6b907dbdb9d7 /python/subunit/test_results.py
parent6954432d06307b33e53bbeb11799b2f61148ef31 (diff)
parentfac837618f84531205f6491fb9d72bde8717fbb8 (diff)
downloadsubunit-git-3a41c806338121e7597c52b8e6e79302e67d8008.tar.gz
Merge trunk r142 and resolve conflicts.
Diffstat (limited to 'python/subunit/test_results.py')
-rw-r--r--python/subunit/test_results.py226
1 files changed, 168 insertions, 58 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index cc588d2..33fb50e 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -18,9 +18,10 @@
import datetime
-import iso8601
import testtools
+from subunit import iso8601
+
# NOT a TestResult, because we are implementing the interface, not inheriting
# it.
@@ -81,8 +82,12 @@ class TestResultDecorator(object):
def stop(self):
return self.decorated.stop()
+ @property
+ def testsRun(self):
+ return self.decorated.testsRun
+
def tags(self, new_tags, gone_tags):
- return self.decorated.time(new_tags, gone_tags)
+ return self.decorated.tags(new_tags, gone_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
@@ -195,6 +200,87 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
+class TagCollapsingDecorator(TestResultDecorator):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ # The (new, gone) tags for the current test.
+ self._current_test_tags = None
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self.decorated.startTest(test)
+ self._current_test_tags = set(), set()
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ # Tags to output for this test.
+ if self._current_test_tags[0] or self._current_test_tags[1]:
+ self.decorated.tags(*self._current_test_tags)
+ self.decorated.stopTest(test)
+ self._current_test_tags = None
+
+ def tags(self, new_tags, gone_tags):
+ """Handle tag instructions.
+
+ Adds and removes tags as appropriate. If a test is currently running,
+ tags are not affected for subsequent tests.
+
+ :param new_tags: Tags to add,
+ :param gone_tags: Tags to remove.
+ """
+ if self._current_test_tags is not None:
+ # gather the tags until the test stops.
+ self._current_test_tags[0].update(new_tags)
+ self._current_test_tags[0].difference_update(gone_tags)
+ self._current_test_tags[1].update(gone_tags)
+ self._current_test_tags[1].difference_update(new_tags)
+ else:
+ return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+ """Only pass on the first and last of a consecutive sequence of times."""
+
+ def __init__(self, decorated):
+ super(TimeCollapsingDecorator, self).__init__(decorated)
+ self._last_received_time = None
+ self._last_sent_time = None
+
+ def _before_event(self):
+ if self._last_received_time is None:
+ return
+ if self._last_received_time != self._last_sent_time:
+ self.decorated.time(self._last_received_time)
+ self._last_sent_time = self._last_received_time
+ self._last_received_time = None
+
+ def time(self, a_time):
+ # Don't upcall, because we don't want to call _before_event, it's only
+ # for non-time events.
+ if self._last_received_time is None:
+ self.decorated.time(a_time)
+ self._last_sent_time = a_time
+ self._last_received_time = a_time
+
+
+def all_true(bools):
+ """Return True if all of 'bools' are True. False otherwise."""
+ for b in bools:
+ if not b:
+ return False
+ return True
+
+
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
@@ -209,7 +295,7 @@ class TestResultFilter(TestResultDecorator):
def __init__(self, result, filter_error=False, filter_failure=False,
filter_success=True, filter_skip=False, filter_xfail=False,
- filter_predicate=None):
+ filter_predicate=None, fixup_expected_failures=None):
"""Create a FilterResult object filtering to result.
:param filter_error: Filter out errors.
@@ -222,71 +308,96 @@ class TestResultFilter(TestResultDecorator):
through. err and details may be none if no error or extra
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
+ :param fixup_expected_failures: Set of test ids to consider known
+ failing.
"""
- TestResultDecorator.__init__(self, result)
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- self._filter_xfail = filter_xfail
- if filter_predicate is None:
- filter_predicate = lambda test, outcome, err, details: True
- self.filter_predicate = filter_predicate
+ super(TestResultFilter, self).__init__(result)
+ self.decorated = TimeCollapsingDecorator(
+ TagCollapsingDecorator(self.decorated))
+ predicates = []
+ if filter_error:
+ predicates.append(lambda t, outcome, e, d: outcome != 'error')
+ if filter_failure:
+ predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+ if filter_success:
+ predicates.append(lambda t, outcome, e, d: outcome != 'success')
+ if filter_skip:
+ predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+ if filter_xfail:
+ predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
+ if filter_predicate is not None:
+ predicates.append(filter_predicate)
+ self.filter_predicate = (
+ lambda test, outcome, err, details:
+ all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
+ # Calls to this result that we don't know whether to forward on yet.
+ self._buffered_calls = []
+ if fixup_expected_failures is None:
+ self._fixup_expected_failures = frozenset()
+ else:
+ self._fixup_expected_failures = fixup_expected_failures
def addError(self, test, err=None, details=None):
- if (not self._filter_error and
- self.filter_predicate(test, 'error', err, details)):
- self.decorated.startTest(test)
- self.decorated.addError(test, err, details=details)
+ if (self.filter_predicate(test, 'error', err, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
- if (not self._filter_failure and
- self.filter_predicate(test, 'failure', err, details)):
- self.decorated.startTest(test)
- self.decorated.addFailure(test, err, details=details)
+ if (self.filter_predicate(test, 'failure', err, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
- if (not self._filter_skip and
- self.filter_predicate(test, 'skip', reason, details)):
- self.decorated.startTest(test)
- self.decorated.addSkip(test, reason, details=details)
+ if (self.filter_predicate(test, 'skip', reason, details)):
+ self._buffered_calls.append(
+ ('addSkip', [test, reason], {'details': details}))
else:
self._filtered()
def addSuccess(self, test, details=None):
- if (not self._filter_success and
- self.filter_predicate(test, 'success', None, details)):
- self.decorated.startTest(test)
- self.decorated.addSuccess(test, details=details)
+ if (self.filter_predicate(test, 'success', None, details)):
+ if self._failure_expected(test):
+ self._buffered_calls.append(
+ ('addUnexpectedSuccess', [test], {'details': details}))
+ else:
+ self._buffered_calls.append(
+ ('addSuccess', [test], {'details': details}))
else:
self._filtered()
def addExpectedFailure(self, test, err=None, details=None):
- if (not self._filter_xfail and
- self.filter_predicate(test, 'expectedfailure', err, details)):
- self.decorated.startTest(test)
- return self.decorated.addExpectedFailure(test, err,
- details=details)
+ if self.filter_predicate(test, 'expectedfailure', err, details):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
else:
self._filtered()
def addUnexpectedSuccess(self, test, details=None):
- self.decorated.startTest(test)
- return self.decorated.addUnexpectedSuccess(test, details=details)
+ self._buffered_calls.append(
+ ('addUnexpectedSuccess', [test], {'details': details}))
def _filtered(self):
self._current_test_filtered = True
+ def _failure_expected(self, test):
+ return (test.id() in self._fixup_expected_failures)
+
def startTest(self, test):
"""Start a test.
@@ -295,7 +406,7 @@ class TestResultFilter(TestResultDecorator):
"""
self._current_test = test
self._current_test_filtered = False
- self._current_test_tags = set(), set()
+ self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
"""Stop a test.
@@ -305,29 +416,18 @@ class TestResultFilter(TestResultDecorator):
"""
if not self._current_test_filtered:
# Tags to output for this test.
- if self._current_test_tags[0] or self._current_test_tags[1]:
- self.decorated.tags(*self._current_test_tags)
+ for method, args, kwargs in self._buffered_calls:
+ getattr(self.decorated, method)(*args, **kwargs)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
- self._current_test_tags = None
+ self._buffered_calls = []
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
+ def time(self, a_time):
if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- return self.decorated.tags(new_tags, gone_tags)
+ self._buffered_calls.append(('time', [a_time], {}))
+ else:
+ return self.decorated.time(a_time)
def id_to_orig_id(self, id):
if id.startswith("subunit.RemotedTestCase."):
@@ -339,10 +439,10 @@ class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
"""Create a FilterResult object outputting to stream."""
- testtools.TestResult.__init__(self)
+ super(TestIdPrintingResult, self).__init__()
self._stream = stream
self.failed_tests = 0
- self.__time = 0
+ self.__time = None
self.show_times = show_times
self._test = None
self._test_duration = 0
@@ -358,6 +458,16 @@ class TestIdPrintingResult(testtools.TestResult):
def addSuccess(self, test):
self._test = test
+ def addSkip(self, test, reason=None, details=None):
+ self._test = test
+
+ def addUnexpectedSuccess(self, test, details=None):
+ self.failed_tests += 1
+ self._test = test
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ self._test = test
+
def reportTest(self, test, duration):
if self.show_times:
seconds = duration.seconds