summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJonathan Lange <jml@canonical.com>2011-03-18 12:58:09 +0000
committerJonathan Lange <jml@canonical.com>2011-03-18 12:58:09 +0000
commit32b4cbe9c436a63ca330b30bbe41566d6335e637 (patch)
tree248f0a29a4eb066c3bfa2615f29866386544a268 /python
parentaa2d6df69c425888524124575dd4c5d59e6e8eb8 (diff)
parent6301e2918b5f2c880b5084f63201b2ee7de4be93 (diff)
downloadsubunit-git-32b4cbe9c436a63ca330b30bbe41566d6335e637.tar.gz
Refactor TestResultFilter, making it collapse time. Make TestResultDecorator.tags()
actually work.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/test_results.py149
-rw-r--r--python/subunit/tests/test_test_results.py107
2 files changed, 215 insertions, 41 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py
index a6ad0c6..e7f9171 100644
--- a/python/subunit/test_results.py
+++ b/python/subunit/test_results.py
@@ -82,7 +82,7 @@ class TestResultDecorator(object):
return self.decorated.stop()
def tags(self, new_tags, gone_tags):
- return self.decorated.time(new_tags, gone_tags)
+ return self.decorated.tags(new_tags, gone_tags)
def time(self, a_datetime):
return self.decorated.time(a_datetime)
@@ -195,6 +195,91 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator):
return self.decorated.time(a_datetime)
+class TagCollapsingDecorator(TestResultDecorator):
+ """Collapses many 'tags' calls into one where possible."""
+
+ def __init__(self, result):
+ super(TagCollapsingDecorator, self).__init__(result)
+ # The current test (for filtering tags)
+ self._current_test = None
+ # The (new, gone) tags for the current test.
+ self._current_test_tags = None
+
+ def startTest(self, test):
+ """Start a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ self.decorated.startTest(test)
+ self._current_test = test
+ self._current_test_tags = set(), set()
+
+ def stopTest(self, test):
+ """Stop a test.
+
+ Not directly passed to the client, but used for handling of tags
+ correctly.
+ """
+ # Tags to output for this test.
+ if self._current_test_tags[0] or self._current_test_tags[1]:
+ self.decorated.tags(*self._current_test_tags)
+ self.decorated.stopTest(test)
+ self._current_test = None
+ self._current_test_tags = None
+
+ def tags(self, new_tags, gone_tags):
+ """Handle tag instructions.
+
+ Adds and removes tags as appropriate. If a test is currently running,
+ tags are not affected for subsequent tests.
+
+ :param new_tags: Tags to add,
+ :param gone_tags: Tags to remove.
+ """
+ if self._current_test is not None:
+ # gather the tags until the test stops.
+ self._current_test_tags[0].update(new_tags)
+ self._current_test_tags[0].difference_update(gone_tags)
+ self._current_test_tags[1].update(gone_tags)
+ self._current_test_tags[1].difference_update(new_tags)
+ else:
+ return self.decorated.tags(new_tags, gone_tags)
+
+
+class TimeCollapsingDecorator(HookedTestResultDecorator):
+ """Only pass on the first and last of a consecutive sequence of times."""
+
+ def __init__(self, decorated):
+ super(TimeCollapsingDecorator, self).__init__(decorated)
+ self._last_received_time = None
+ self._last_sent_time = None
+
+ def _before_event(self):
+ if self._last_received_time is None:
+ return
+ if self._last_received_time != self._last_sent_time:
+ self.decorated.time(self._last_received_time)
+ self._last_sent_time = self._last_received_time
+ self._last_received_time = None
+
+ def time(self, a_time):
+ # Don't upcall, because we don't want to call _before_event, it's only
+ # for non-time events.
+ if self._last_received_time is None:
+ self.decorated.time(a_time)
+ self._last_sent_time = a_time
+ self._last_received_time = a_time
+
+
+def all_true(bools):
+ """Return True if all of 'bools' are True. False otherwise."""
+ for b in bools:
+ if not b:
+ return False
+ return True
+
+
class TestResultFilter(TestResultDecorator):
"""A pyunit TestResult interface implementation which filters tests.
@@ -222,50 +307,53 @@ class TestResultFilter(TestResultDecorator):
metadata is available. outcome is the name of the outcome such
as 'success' or 'failure'.
"""
- TestResultDecorator.__init__(self, result)
- self._filter_error = filter_error
- self._filter_failure = filter_failure
- self._filter_success = filter_success
- self._filter_skip = filter_skip
- if filter_predicate is None:
- filter_predicate = lambda test, outcome, err, details: True
- self.filter_predicate = filter_predicate
+ super(TestResultFilter, self).__init__(result)
+ self.decorated = TimeCollapsingDecorator(
+ TagCollapsingDecorator(self.decorated))
+ predicates = []
+ if filter_error:
+ predicates.append(lambda t, outcome, e, d: outcome != 'error')
+ if filter_failure:
+ predicates.append(lambda t, outcome, e, d: outcome != 'failure')
+ if filter_success:
+ predicates.append(lambda t, outcome, e, d: outcome != 'success')
+ if filter_skip:
+ predicates.append(lambda t, outcome, e, d: outcome != 'skip')
+ if filter_predicate is not None:
+ predicates.append(filter_predicate)
+ self.filter_predicate = (
+ lambda test, outcome, err, details:
+ all_true(p(test, outcome, err, details) for p in predicates))
# The current test (for filtering tags)
self._current_test = None
# Has the current test been filtered (for outputting test tags)
self._current_test_filtered = None
- # The (new, gone) tags for the current test.
- self._current_test_tags = None
# Calls to this result that we don't know whether to forward on yet.
self._buffered_calls = []
def addError(self, test, err=None, details=None):
- if (not self._filter_error and
- self.filter_predicate(test, 'error', err, details)):
+ if (self.filter_predicate(test, 'error', err, details)):
self._buffered_calls.append(
('addError', [test, err], {'details': details}))
else:
self._filtered()
def addFailure(self, test, err=None, details=None):
- if (not self._filter_failure and
- self.filter_predicate(test, 'failure', err, details)):
+ if (self.filter_predicate(test, 'failure', err, details)):
self._buffered_calls.append(
('addFailure', [test, err], {'details': details}))
else:
self._filtered()
def addSkip(self, test, reason=None, details=None):
- if (not self._filter_skip and
- self.filter_predicate(test, 'skip', reason, details)):
+ if (self.filter_predicate(test, 'skip', reason, details)):
self._buffered_calls.append(
('addSkip', [reason], {'details': details}))
else:
self._filtered()
def addSuccess(self, test, details=None):
- if (not self._filter_success and
- self.filter_predicate(test, 'success', None, details)):
+ if (self.filter_predicate(test, 'success', None, details)):
self._buffered_calls.append(
('addSuccess', [test], {'details': details}))
else:
@@ -293,7 +381,6 @@ class TestResultFilter(TestResultDecorator):
"""
self._current_test = test
self._current_test_filtered = False
- self._current_test_tags = set(), set()
self._buffered_calls.append(('startTest', [test], {}))
def stopTest(self, test):
@@ -306,31 +393,11 @@ class TestResultFilter(TestResultDecorator):
# Tags to output for this test.
for method, args, kwargs in self._buffered_calls:
getattr(self.decorated, method)(*args, **kwargs)
- if self._current_test_tags[0] or self._current_test_tags[1]:
- self.decorated.tags(*self._current_test_tags)
self.decorated.stopTest(test)
self._current_test = None
self._current_test_filtered = None
- self._current_test_tags = None
self._buffered_calls = []
- def tags(self, new_tags, gone_tags):
- """Handle tag instructions.
-
- Adds and removes tags as appropriate. If a test is currently running,
- tags are not affected for subsequent tests.
-
- :param new_tags: Tags to add,
- :param gone_tags: Tags to remove.
- """
- if self._current_test is not None:
- # gather the tags until the test stops.
- self._current_test_tags[0].update(new_tags)
- self._current_test_tags[0].difference_update(gone_tags)
- self._current_test_tags[1].update(gone_tags)
- self._current_test_tags[1].difference_update(new_tags)
- return self.decorated.tags(new_tags, gone_tags)
-
def time(self, a_time):
if self._current_test is not None:
self._buffered_calls.append(('time', [a_time], {}))
@@ -347,7 +414,7 @@ class TestIdPrintingResult(testtools.TestResult):
def __init__(self, stream, show_times=False):
"""Create a FilterResult object outputting to stream."""
- testtools.TestResult.__init__(self)
+ super(TestIdPrintingResult, self).__init__()
self._stream = stream
self.failed_tests = 0
self.__time = 0
diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py
index a2dad96..94d2274 100644
--- a/python/subunit/tests/test_test_results.py
+++ b/python/subunit/tests/test_test_results.py
@@ -17,6 +17,9 @@
import datetime
import unittest
+from testtools import TestCase
+from testtools.testresult.doubles import ExtendedTestResult
+
import subunit
import subunit.iso8601 as iso8601
import subunit.test_results
@@ -187,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase):
self.assertNotEqual(None, self.decorated._calls[2])
+class TestTagCollapsingDecorator(TestCase):
+
+ def test_tags_forwarded_outside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ tag_collapser.tags(set(['a', 'b']), set())
+ self.assertEquals(
+ [('tags', set(['a', 'b']), set([]))], result._events)
+
+ def test_tags_collapsed_inside_of_tests(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(['a']), set())
+ tag_collapser.tags(set(['b']), set(['a']))
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['b', 'c']), set(['a'])),
+ ('stopTest', test)],
+ result._events)
+
+ def test_tags_collapsed_inside_of_tests_different_ordering(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TagCollapsingDecorator(result)
+ test = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(test)
+ tag_collapser.tags(set(), set(['a']))
+ tag_collapser.tags(set(['a', 'b']), set())
+ tag_collapser.tags(set(['c']), set())
+ tag_collapser.stopTest(test)
+ self.assertEquals(
+ [('startTest', test),
+ ('tags', set(['a', 'b', 'c']), set()),
+ ('stopTest', test)],
+ result._events)
+
+
+class TestTimeCollapsingDecorator(TestCase):
+
+ def make_time(self):
+ # Heh heh.
+ return datetime.datetime(
+ 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC)
+
+ def test_initial_time_forwarded(self):
+ # We always forward the first time event we see.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ self.assertEquals([('time', a_time)], result._events)
+
+ def test_time_collapsed_to_first_and_last(self):
+ # If there are many consecutive time events, only the first and last
+ # are sent through.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ times = [self.make_time() for i in range(5)]
+ for a_time in times:
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals(
+ [('time', times[0]), ('time', times[-1])], result._events[:-1])
+
+ def test_only_one_time_sent(self):
+ # If we receive a single time event followed by a non-time event, we
+ # send exactly one time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_duplicate_times_not_sent(self):
+ # Many time events with the exact same time are collapsed into one
+ # time event.
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ for i in range(5):
+ tag_collapser.time(a_time)
+ tag_collapser.startTest(subunit.RemotedTestCase('foo'))
+ self.assertEquals([('time', a_time)], result._events[:-1])
+
+ def test_no_times_inserted(self):
+ result = ExtendedTestResult()
+ tag_collapser = subunit.test_results.TimeCollapsingDecorator(result)
+ a_time = self.make_time()
+ tag_collapser.time(a_time)
+ foo = subunit.RemotedTestCase('foo')
+ tag_collapser.startTest(foo)
+ tag_collapser.addSuccess(foo)
+ tag_collapser.stopTest(foo)
+ self.assertEquals(
+ [('time', a_time),
+ ('startTest', foo),
+ ('addSuccess', foo),
+ ('stopTest', foo)], result._events)
+
+
def test_suite():
loader = subunit.tests.TestUtil.TestLoader()
result = loader.loadTestsFromName(__name__)