summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJonathan Lange <jml@mumak.net>2012-04-10 13:56:05 +0100
committerJonathan Lange <jml@mumak.net>2012-04-10 13:56:05 +0100
commit229151e1f399cefadf843a41ee08791fa1aa451a (patch)
treea22e5369eca4da2af97d5d7702338159d9f485bd /python
parenteff5dc6d9da686ebbb2de9444c5bdc46e5a0539f (diff)
downloadsubunit-git-229151e1f399cefadf843a41ee08791fa1aa451a.tar.gz
Extract out a filter base class that just deals with predicates.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/test_results.py164
1 files changed, 96 insertions, 68 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index fb7affd..32dff6a 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -286,7 +286,100 @@ def all_true(bools):
return True
-class TestResultFilter(TestResultDecorator):
+class _PredicateFilter(TestResultDecorator):
+
+ def __init__(self, result, predicate):
+ super(_PredicateFilter, self).__init__(result)
+ self.decorated = TimeCollapsingDecorator(
+ TagCollapsingDecorator(self.decorated))
+ self.filter_predicate = predicate
+ # The current test (for filtering tags)
+ self._current_test = None
+ # Has the current test been filtered (for outputting test tags)
+ self._current_test_filtered = None
+ # Calls to this result that we don't know whether to forward on yet.
+ self._buffered_calls = []
+
+ def addError(self, test, err=None, details=None):
+ if (self.filter_predicate(test, 'error', err, details)):
+ self._buffered_calls.append(
+ ('addError', [test, err], {'details': details}))
+ else:
+ self._filtered()
+
+ def addFailure(self, test, err=None, details=None):
+ if (self.filter_predicate(test, 'failure', err, details)):
+ self._buffered_calls.append(
+ ('addFailure', [test, err], {'details': details}))
+ else:
+ self._filtered()
+
+ def addSkip(self, test, reason=None, details=None):
+ if (self.filter_predicate(test, 'skip', reason, details)):
+ self._buffered_calls.append(
+ ('addSkip', [test, reason], {'details': details}))
+ else:
+ self._filtered()
+
+ def addExpectedFailure(self, test, err=None, details=None):
+ if self.filter_predicate(test, 'expectedfailure', err, details):
+ self._buffered_calls.append(
+ ('addExpectedFailure', [test, err], {'details': details}))
+ else:
+ self._filtered()
+
+ def addUnexpectedSuccess(self, test, details=None):
+ self._buffered_calls.append(
+ ('addUnexpectedSuccess', [test], {'details': details}))
+
+ def addSuccess(self, test, details=None):
+ if (self.filter_predicate(test, 'success', None, details)):
+ self._buffered_calls.append(
+ ('addSuccess', [test], {'details': details}))
+ else:
+ self._filtered()
+
+ def _filtered(self):
+ self._current_test_filtered = True
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self._current_test = test
+ self._current_test_filtered = False
+ self._buffered_calls.append(('startTest', [test], {}))
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ if not self._current_test_filtered:
+ # Tags to output for this test.
+ for method, args, kwargs in self._buffered_calls:
+ getattr(self.decorated, method)(*args, **kwargs)
+ self.decorated.stopTest(test)
+ self._current_test = None
+ self._current_test_filtered = None
+ self._buffered_calls = []
+
+ def time(self, a_time):
+ if self._current_test is not None:
+ self._buffered_calls.append(('time', [a_time], {}))
+ else:
+ return self.decorated.time(a_time)
+
+ def id_to_orig_id(self, id):
+ if id.startswith("subunit.RemotedTestCase."):
+ return id[len("subunit.RemotedTestCase."):]
+ return id
+
+
+class TestResultFilter(_PredicateFilter):
"""A pyunit TestResult interface implementation which filters tests.
Tests that pass the filter are handed on to another TestResult instance
@@ -316,9 +409,6 @@ class TestResultFilter(TestResultDecorator):
:param fixup_expected_failures: Set of test ids to consider known
failing.
"""
- super(TestResultFilter, self).__init__(result)
- self.decorated = TimeCollapsingDecorator(
- TagCollapsingDecorator(self.decorated))
predicates = []
if filter_error:
predicates.append(lambda t, outcome, e, d: outcome != 'error')
@@ -332,15 +422,10 @@ class TestResultFilter(TestResultDecorator):
predicates.append(lambda t, outcome, e, d: outcome != 'expectedfailure')
if filter_predicate is not None:
predicates.append(filter_predicate)
- self.filter_predicate = (
+ predicate = (
lambda test, outcome, err, details:
all_true(p(test, outcome, err, details) for p in predicates))
- # The current test (for filtering tags)
- self._current_test = None
- # Has the current test been filtered (for outputting test tags)
- self._current_test_filtered = None
- # Calls to this result that we don't know whether to forward on yet.
- self._buffered_calls = []
+ super(TestResultFilter, self).__init__(result, predicate)
if fixup_expected_failures is None:
self._fixup_expected_failures = frozenset()
else:
@@ -368,13 +453,6 @@ class TestResultFilter(TestResultDecorator):
else:
self._filtered()
- def addSkip(self, test, reason=None, details=None):
- if (self.filter_predicate(test, 'skip', reason, details)):
- self._buffered_calls.append(
- ('addSkip', [test, reason], {'details': details}))
- else:
- self._filtered()
-
def addSuccess(self, test, details=None):
if (self.filter_predicate(test, 'success', None, details)):
if self._failure_expected(test):
@@ -386,59 +464,9 @@ class TestResultFilter(TestResultDecorator):
else:
self._filtered()
- def addExpectedFailure(self, test, err=None, details=None):
- if self.filter_predicate(test, 'expectedfailure', err, details):
- self._buffered_calls.append(
- ('addExpectedFailure', [test, err], {'details': details}))
- else:
- self._filtered()
-
- def addUnexpectedSuccess(self, test, details=None):
- self._buffered_calls.append(
- ('addUnexpectedSuccess', [test], {'details': details}))
-
- def _filtered(self):
- self._current_test_filtered = True
-
def _failure_expected(self, test):
return (test.id() in self._fixup_expected_failures)
- def startTest(self, test):
- """Start a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- self._current_test = test
- self._current_test_filtered = False
- self._buffered_calls.append(('startTest', [test], {}))
-
- def stopTest(self, test):
- """Stop a test.
-
- Not directly passed to the client, but used for handling of tags
- correctly.
- """
- if not self._current_test_filtered:
- # Tags to output for this test.
- for method, args, kwargs in self._buffered_calls:
- getattr(self.decorated, method)(*args, **kwargs)
- self.decorated.stopTest(test)
- self._current_test = None
- self._current_test_filtered = None
- self._buffered_calls = []
-
- def time(self, a_time):
- if self._current_test is not None:
- self._buffered_calls.append(('time', [a_time], {}))
- else:
- return self.decorated.time(a_time)
-
- def id_to_orig_id(self, id):
- if id.startswith("subunit.RemotedTestCase."):
- return id[len("subunit.RemotedTestCase."):]
- return id
-
class TestIdPrintingResult(testtools.TestResult):