diff options
| author | Jonathan Lange <jml@canonical.com> | 2011-03-18 12:58:09 +0000 |
|---|---|---|
| committer | Jonathan Lange <jml@canonical.com> | 2011-03-18 12:58:09 +0000 |
| commit | 32b4cbe9c436a63ca330b30bbe41566d6335e637 (patch) | |
| tree | 248f0a29a4eb066c3bfa2615f29866386544a268 /python | |
| parent | aa2d6df69c425888524124575dd4c5d59e6e8eb8 (diff) | |
| parent | 6301e2918b5f2c880b5084f63201b2ee7de4be93 (diff) | |
| download | subunit-git-32b4cbe9c436a63ca330b30bbe41566d6335e637.tar.gz | |
Refactor TestResultFilter, making it collapse time. Make TestResultDecorator.tags()
actually work.
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/test_results.py | 149 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_results.py | 107 |
2 files changed, 215 insertions, 41 deletions
diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index a6ad0c6..e7f9171 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -82,7 +82,7 @@ class TestResultDecorator(object): return self.decorated.stop() def tags(self, new_tags, gone_tags): - return self.decorated.time(new_tags, gone_tags) + return self.decorated.tags(new_tags, gone_tags) def time(self, a_datetime): return self.decorated.time(a_datetime) @@ -195,6 +195,91 @@ class AutoTimingTestResultDecorator(HookedTestResultDecorator): return self.decorated.time(a_datetime) +class TagCollapsingDecorator(TestResultDecorator): + """Collapses many 'tags' calls into one where possible.""" + + def __init__(self, result): + super(TagCollapsingDecorator, self).__init__(result) + # The current test (for filtering tags) + self._current_test = None + # The (new, gone) tags for the current test. + self._current_test_tags = None + + def startTest(self, test): + """Start a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + self.decorated.startTest(test) + self._current_test = test + self._current_test_tags = set(), set() + + def stopTest(self, test): + """Stop a test. + + Not directly passed to the client, but used for handling of tags + correctly. + """ + # Tags to output for this test. + if self._current_test_tags[0] or self._current_test_tags[1]: + self.decorated.tags(*self._current_test_tags) + self.decorated.stopTest(test) + self._current_test = None + self._current_test_tags = None + + def tags(self, new_tags, gone_tags): + """Handle tag instructions. + + Adds and removes tags as appropriate. If a test is currently running, + tags are not affected for subsequent tests. + + :param new_tags: Tags to add, + :param gone_tags: Tags to remove. + """ + if self._current_test is not None: + # gather the tags until the test stops. + self._current_test_tags[0].update(new_tags) + self._current_test_tags[0].difference_update(gone_tags) + self._current_test_tags[1].update(gone_tags) + self._current_test_tags[1].difference_update(new_tags) + else: + return self.decorated.tags(new_tags, gone_tags) + + +class TimeCollapsingDecorator(HookedTestResultDecorator): + """Only pass on the first and last of a consecutive sequence of times.""" + + def __init__(self, decorated): + super(TimeCollapsingDecorator, self).__init__(decorated) + self._last_received_time = None + self._last_sent_time = None + + def _before_event(self): + if self._last_received_time is None: + return + if self._last_received_time != self._last_sent_time: + self.decorated.time(self._last_received_time) + self._last_sent_time = self._last_received_time + self._last_received_time = None + + def time(self, a_time): + # Don't upcall, because we don't want to call _before_event, it's only + # for non-time events. + if self._last_received_time is None: + self.decorated.time(a_time) + self._last_sent_time = a_time + self._last_received_time = a_time + + +def all_true(bools): + """Return True if all of 'bools' are True. False otherwise.""" + for b in bools: + if not b: + return False + return True + + class TestResultFilter(TestResultDecorator): """A pyunit TestResult interface implementation which filters tests. @@ -222,50 +307,53 @@ class TestResultFilter(TestResultDecorator): metadata is available. outcome is the name of the outcome such as 'success' or 'failure'. """ - TestResultDecorator.__init__(self, result) - self._filter_error = filter_error - self._filter_failure = filter_failure - self._filter_success = filter_success - self._filter_skip = filter_skip - if filter_predicate is None: - filter_predicate = lambda test, outcome, err, details: True - self.filter_predicate = filter_predicate + super(TestResultFilter, self).__init__(result) + self.decorated = TimeCollapsingDecorator( + TagCollapsingDecorator(self.decorated)) + predicates = [] + if filter_error: + predicates.append(lambda t, outcome, e, d: outcome != 'error') + if filter_failure: + predicates.append(lambda t, outcome, e, d: outcome != 'failure') + if filter_success: + predicates.append(lambda t, outcome, e, d: outcome != 'success') + if filter_skip: + predicates.append(lambda t, outcome, e, d: outcome != 'skip') + if filter_predicate is not None: + predicates.append(filter_predicate) + self.filter_predicate = ( + lambda test, outcome, err, details: + all_true(p(test, outcome, err, details) for p in predicates)) # The current test (for filtering tags) self._current_test = None # Has the current test been filtered (for outputting test tags) self._current_test_filtered = None - # The (new, gone) tags for the current test. - self._current_test_tags = None # Calls to this result that we don't know whether to forward on yet. self._buffered_calls = [] def addError(self, test, err=None, details=None): - if (not self._filter_error and - self.filter_predicate(test, 'error', err, details)): + if (self.filter_predicate(test, 'error', err, details)): self._buffered_calls.append( ('addError', [test, err], {'details': details})) else: self._filtered() def addFailure(self, test, err=None, details=None): - if (not self._filter_failure and - self.filter_predicate(test, 'failure', err, details)): + if (self.filter_predicate(test, 'failure', err, details)): self._buffered_calls.append( ('addFailure', [test, err], {'details': details})) else: self._filtered() def addSkip(self, test, reason=None, details=None): - if (not self._filter_skip and - self.filter_predicate(test, 'skip', reason, details)): + if (self.filter_predicate(test, 'skip', reason, details)): self._buffered_calls.append( ('addSkip', [reason], {'details': details})) else: self._filtered() def addSuccess(self, test, details=None): - if (not self._filter_success and - self.filter_predicate(test, 'success', None, details)): + if (self.filter_predicate(test, 'success', None, details)): self._buffered_calls.append( ('addSuccess', [test], {'details': details})) else: @@ -293,7 +381,6 @@ class TestResultFilter(TestResultDecorator): """ self._current_test = test self._current_test_filtered = False - self._current_test_tags = set(), set() self._buffered_calls.append(('startTest', [test], {})) def stopTest(self, test): @@ -306,31 +393,11 @@ class TestResultFilter(TestResultDecorator): # Tags to output for this test. for method, args, kwargs in self._buffered_calls: getattr(self.decorated, method)(*args, **kwargs) - if self._current_test_tags[0] or self._current_test_tags[1]: - self.decorated.tags(*self._current_test_tags) self.decorated.stopTest(test) self._current_test = None self._current_test_filtered = None - self._current_test_tags = None self._buffered_calls = [] - def tags(self, new_tags, gone_tags): - """Handle tag instructions. - - Adds and removes tags as appropriate. If a test is currently running, - tags are not affected for subsequent tests. - - :param new_tags: Tags to add, - :param gone_tags: Tags to remove. - """ - if self._current_test is not None: - # gather the tags until the test stops. - self._current_test_tags[0].update(new_tags) - self._current_test_tags[0].difference_update(gone_tags) - self._current_test_tags[1].update(gone_tags) - self._current_test_tags[1].difference_update(new_tags) - return self.decorated.tags(new_tags, gone_tags) - def time(self, a_time): if self._current_test is not None: self._buffered_calls.append(('time', [a_time], {})) @@ -347,7 +414,7 @@ class TestIdPrintingResult(testtools.TestResult): def __init__(self, stream, show_times=False): """Create a FilterResult object outputting to stream.""" - testtools.TestResult.__init__(self) + super(TestIdPrintingResult, self).__init__() self._stream = stream self.failed_tests = 0 self.__time = 0 diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py index a2dad96..94d2274 100644 --- a/python/subunit/tests/test_test_results.py +++ b/python/subunit/tests/test_test_results.py @@ -17,6 +17,9 @@ import datetime import unittest +from testtools import TestCase +from testtools.testresult.doubles import ExtendedTestResult + import subunit import subunit.iso8601 as iso8601 import subunit.test_results @@ -187,6 +190,110 @@ class TestAutoTimingTestResultDecorator(unittest.TestCase): self.assertNotEqual(None, self.decorated._calls[2]) +class TestTagCollapsingDecorator(TestCase): + + def test_tags_forwarded_outside_of_tests(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + tag_collapser.tags(set(['a', 'b']), set()) + self.assertEquals( + [('tags', set(['a', 'b']), set([]))], result._events) + + def test_tags_collapsed_inside_of_tests(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(['a']), set()) + tag_collapser.tags(set(['b']), set(['a'])) + tag_collapser.tags(set(['c']), set()) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['b', 'c']), set(['a'])), + ('stopTest', test)], + result._events) + + def test_tags_collapsed_inside_of_tests_different_ordering(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TagCollapsingDecorator(result) + test = subunit.RemotedTestCase('foo') + tag_collapser.startTest(test) + tag_collapser.tags(set(), set(['a'])) + tag_collapser.tags(set(['a', 'b']), set()) + tag_collapser.tags(set(['c']), set()) + tag_collapser.stopTest(test) + self.assertEquals( + [('startTest', test), + ('tags', set(['a', 'b', 'c']), set()), + ('stopTest', test)], + result._events) + + +class TestTimeCollapsingDecorator(TestCase): + + def make_time(self): + # Heh heh. + return datetime.datetime( + 2000, 1, self.getUniqueInteger(), tzinfo=iso8601.UTC) + + def test_initial_time_forwarded(self): + # We always forward the first time event we see. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + self.assertEquals([('time', a_time)], result._events) + + def test_time_collapsed_to_first_and_last(self): + # If there are many consecutive time events, only the first and last + # are sent through. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + times = [self.make_time() for i in range(5)] + for a_time in times: + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals( + [('time', times[0]), ('time', times[-1])], result._events[:-1]) + + def test_only_one_time_sent(self): + # If we receive a single time event followed by a non-time event, we + # send exactly one time event. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals([('time', a_time)], result._events[:-1]) + + def test_duplicate_times_not_sent(self): + # Many time events with the exact same time are collapsed into one + # time event. + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + for i in range(5): + tag_collapser.time(a_time) + tag_collapser.startTest(subunit.RemotedTestCase('foo')) + self.assertEquals([('time', a_time)], result._events[:-1]) + + def test_no_times_inserted(self): + result = ExtendedTestResult() + tag_collapser = subunit.test_results.TimeCollapsingDecorator(result) + a_time = self.make_time() + tag_collapser.time(a_time) + foo = subunit.RemotedTestCase('foo') + tag_collapser.startTest(foo) + tag_collapser.addSuccess(foo) + tag_collapser.stopTest(foo) + self.assertEquals( + [('time', a_time), + ('startTest', foo), + ('addSuccess', foo), + ('stopTest', foo)], result._events) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |
