diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/filters.py | 125 | ||||
| -rw-r--r-- | python/subunit/test_results.py | 98 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_results.py | 198 |
3 files changed, 421 insertions, 0 deletions
diff --git a/python/subunit/filters.py b/python/subunit/filters.py new file mode 100644 index 0000000..dc3fd8a --- /dev/null +++ b/python/subunit/filters.py @@ -0,0 +1,125 @@ +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2009 Robert Collins <robertc@robertcollins.net> +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + + +from optparse import OptionParser +import sys + +from subunit import DiscardStream, ProtocolTestCase + + +def make_options(description): + parser = OptionParser(description=description) + parser.add_option( + "--no-passthrough", action="store_true", + help="Hide all non subunit input.", default=False, + dest="no_passthrough") + parser.add_option( + "-o", "--output-to", + help="Send the output to this path rather than stdout.") + parser.add_option( + "-f", "--forward", action="store_true", default=False, + help="Forward subunit stream on stdout.") + return parser + + +def run_tests_from_stream(input_stream, result, passthrough_stream=None, + forward_stream=None): + """Run tests from a subunit input stream through 'result'. + + :param input_stream: A stream containing subunit input. + :param result: A TestResult that will receive the test events. + :param passthrough_stream: All non-subunit input received will be + sent to this stream. If not provided, uses the ``TestProtocolServer`` + default, which is ``sys.stdout``. + :param forward_stream: All subunit input received will be forwarded + to this stream. If not provided, uses the ``TestProtocolServer`` + default, which is to not forward any input. + """ + test = ProtocolTestCase( + input_stream, passthrough=passthrough_stream, + forward=forward_stream) + result.startTestRun() + test.run(result) + result.stopTestRun() + + +def filter_by_result(result_factory, output_path, passthrough, forward, + input_stream=sys.stdin): + """Filter an input stream using a test result. + + :param result_factory: A callable that when passed an output stream + returns a TestResult. It is expected that this result will output + to the given stream. + :param output_path: A path send output to. If None, output will be go + to ``sys.stdout``. + :param passthrough: If True, all non-subunit input will be sent to + ``sys.stdout``. If False, that input will be discarded. + :param forward: If True, all subunit input will be forwarded directly to + ``sys.stdout`` as well as to the ``TestResult``. + :param input_stream: The source of subunit input. Defaults to + ``sys.stdin``. + :return: A test result with the resultts of the run. + """ + if passthrough: + passthrough_stream = sys.stdout + else: + passthrough_stream = DiscardStream() + + if forward: + forward_stream = sys.stdout + else: + forward_stream = DiscardStream() + + if output_path is None: + output_to = sys.stdout + else: + output_to = file(output_path, 'wb') + + try: + result = result_factory(output_to) + run_tests_from_stream( + input_stream, result, passthrough_stream, forward_stream) + finally: + if output_path: + output_to.close() + return result + + +def run_filter_script(result_factory, description, post_run_hook=None): + """Main function for simple subunit filter scripts. + + Many subunit filter scripts take a stream of subunit input and use a + TestResult to handle the events generated by that stream. This function + wraps a lot of the boiler-plate around that by making a script with + options for handling passthrough information and stream forwarding, and + that will exit with a successful return code (i.e. 0) if the input stream + represents a successful test run. + + :param result_factory: A callable that takes an output stream and returns + a test result that outputs to that stream. + :param description: A description of the filter script. + """ + parser = make_options(description) + (options, args) = parser.parse_args() + result = filter_by_result( + result_factory, options.output_to, not options.no_passthrough, + options.forward) + if post_run_hook: + post_run_hook(result) + if result.wasSuccessful(): + sys.exit(0) + else: + sys.exit(1) diff --git a/python/subunit/test_results.py b/python/subunit/test_results.py index 33fb50e..fb7affd 100644 --- a/python/subunit/test_results.py +++ b/python/subunit/test_results.py @@ -16,9 +16,14 @@ """TestResult helper classes used to by subunit.""" +import csv import datetime import testtools +from testtools.content import ( + text_content, + TracebackContent, + ) from subunit import iso8601 @@ -493,3 +498,96 @@ class TestIdPrintingResult(testtools.TestResult): def wasSuccessful(self): "Tells whether or not this result was a success" return self.failed_tests == 0 + + +class TestByTestResult(testtools.TestResult): + """Call something every time a test completes.""" + + # XXX: Arguably belongs in testtools. + + def __init__(self, on_test): + """Construct a ``TestByTestResult``. + + :param on_test: A callable that take a test case, a status (one of + "success", "failure", "error", "skip", or "xfail"), a start time + (a ``datetime`` with timezone), a stop time, an iterable of tags, + and a details dict. Is called at the end of each test (i.e. on + ``stopTest``) with the accumulated values for that test. + """ + super(TestByTestResult, self).__init__() + self._on_test = on_test + + def startTest(self, test): + super(TestByTestResult, self).startTest(test) + self._start_time = self._now() + # There's no supported (i.e. tested) behaviour that relies on these + # being set, but it makes me more comfortable all the same. -- jml + self._status = None + self._details = None + self._stop_time = None + + def stopTest(self, test): + self._stop_time = self._now() + super(TestByTestResult, self).stopTest(test) + self._on_test( + test=test, + status=self._status, + start_time=self._start_time, + stop_time=self._stop_time, + # current_tags is new in testtools 0.9.13. + tags=getattr(self, 'current_tags', None), + details=self._details) + + def _err_to_details(self, test, err, details): + if details: + return details + return {'traceback': TracebackContent(err, test)} + + def addSuccess(self, test, details=None): + super(TestByTestResult, self).addSuccess(test) + self._status = 'success' + self._details = details + + def addFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addFailure(test, err, details) + self._status = 'failure' + self._details = self._err_to_details(test, err, details) + + def addError(self, test, err=None, details=None): + super(TestByTestResult, self).addError(test, err, details) + self._status = 'error' + self._details = self._err_to_details(test, err, details) + + def addSkip(self, test, reason=None, details=None): + super(TestByTestResult, self).addSkip(test, reason, details) + self._status = 'skip' + if details is None: + details = {'reason': text_content(reason)} + elif reason: + # XXX: What if details already has 'reason' key? + details['reason'] = text_content(reason) + self._details = details + + def addExpectedFailure(self, test, err=None, details=None): + super(TestByTestResult, self).addExpectedFailure(test, err, details) + self._status = 'xfail' + self._details = self._err_to_details(test, err, details) + + def addUnexpectedSuccess(self, test, details=None): + super(TestByTestResult, self).addUnexpectedSuccess(test, details) + self._status = 'success' + self._details = details + + +class CsvResult(TestByTestResult): + + def __init__(self, stream): + super(CsvResult, self).__init__(self._on_test) + self._write_row = csv.writer(stream).writerow + + def _on_test(self, test, status, start_time, stop_time, tags, details): + self._write_row([test.id(), status, start_time, stop_time]) + + def startTestRun(self): + super(CsvResult, self).startTestRun() + self._write_row(['test', 'status', 'start_time', 'stop_time']) diff --git a/python/subunit/tests/test_test_results.py b/python/subunit/tests/test_test_results.py index 94d2274..6beb57a 100644 --- a/python/subunit/tests/test_test_results.py +++ b/python/subunit/tests/test_test_results.py @@ -14,16 +14,25 @@ # limitations under that license. # +import csv import datetime +import sys import unittest from testtools import TestCase +from testtools.compat import StringIO +from testtools.content import ( + text_content, + TracebackContent, + ) from testtools.testresult.doubles import ExtendedTestResult import subunit import subunit.iso8601 as iso8601 import subunit.test_results +import testtools + class LoggingDecorator(subunit.test_results.HookedTestResultDecorator): @@ -294,6 +303,195 @@ class TestTimeCollapsingDecorator(TestCase): ('stopTest', foo)], result._events) +class TestByTestResultTests(testtools.TestCase): + + def setUp(self): + super(TestByTestResultTests, self).setUp() + self.log = [] + self.result = subunit.test_results.TestByTestResult(self.on_test) + self.result._now = iter(range(5)).next + + def assertCalled(self, **kwargs): + defaults = { + 'test': self, + 'tags': set(), + 'details': None, + 'start_time': 0, + 'stop_time': 1, + } + defaults.update(kwargs) + self.assertEqual([defaults], self.log) + + def on_test(self, **kwargs): + self.log.append(kwargs) + + def test_no_tests_nothing_reported(self): + self.result.startTestRun() + self.result.stopTestRun() + self.assertEqual([], self.log) + + def test_add_success(self): + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success') + + def test_add_success_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_tags(self): + if not getattr(self.result, 'tags', None): + self.skipTest("No tags in testtools") + self.result.tags(['foo'], []) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertCalled(status='success', tags=set(['foo'])) + + def test_add_error(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addError(self, error) + self.result.stopTest(self) + self.assertCalled( + status='error', + details={'traceback': TracebackContent(error, self)}) + + def test_add_error_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addError(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='error', details=details) + + def test_add_failure(self): + self.result.startTest(self) + try: + self.fail("intentional failure") + except self.failureException: + failure = sys.exc_info() + self.result.addFailure(self, failure) + self.result.stopTest(self) + self.assertCalled( + status='failure', + details={'traceback': TracebackContent(failure, self)}) + + def test_add_failure_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='failure', details=details) + + def test_add_xfail(self): + self.result.startTest(self) + try: + 1/0 + except ZeroDivisionError: + error = sys.exc_info() + self.result.addExpectedFailure(self, error) + self.result.stopTest(self) + self.assertCalled( + status='xfail', + details={'traceback': TracebackContent(error, self)}) + + def test_add_xfail_details(self): + self.result.startTest(self) + details = {"foo": text_content("bar")} + self.result.addExpectedFailure(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='xfail', details=details) + + def test_add_unexpected_success(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addUnexpectedSuccess(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='success', details=details) + + def test_add_skip_reason(self): + self.result.startTest(self) + reason = self.getUniqueString() + self.result.addSkip(self, reason) + self.result.stopTest(self) + self.assertCalled( + status='skip', details={'reason': text_content(reason)}) + + def test_add_skip_details(self): + self.result.startTest(self) + details = {'foo': 'bar'} + self.result.addSkip(self, details=details) + self.result.stopTest(self) + self.assertCalled(status='skip', details=details) + + def test_twice(self): + self.result.startTest(self) + self.result.addSuccess(self, details={'foo': 'bar'}) + self.result.stopTest(self) + self.result.startTest(self) + self.result.addSuccess(self) + self.result.stopTest(self) + self.assertEqual( + [{'test': self, + 'status': 'success', + 'start_time': 0, + 'stop_time': 1, + 'tags': set(), + 'details': {'foo': 'bar'}}, + {'test': self, + 'status': 'success', + 'start_time': 2, + 'stop_time': 3, + 'tags': set(), + 'details': None}, + ], + self.log) + + +class TestCsvResult(testtools.TestCase): + + def parse_stream(self, stream): + stream.seek(0) + reader = csv.reader(stream) + return list(reader) + + def test_csv_output(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + result._now = iter(range(5)).next + result.startTestRun() + result.startTest(self) + result.addSuccess(self) + result.stopTest(self) + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time'], + [self.id(), 'success', '0', '1'], + ], + self.parse_stream(stream)) + + def test_just_header_when_no_tests(self): + stream = StringIO() + result = subunit.test_results.CsvResult(stream) + result.startTestRun() + result.stopTestRun() + self.assertEqual( + [['test', 'status', 'start_time', 'stop_time']], + self.parse_stream(stream)) + + def test_no_output_before_events(self): + stream = StringIO() + subunit.test_results.CsvResult(stream) + self.assertEqual([], self.parse_stream(stream)) + + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() result = loader.loadTestsFromName(__name__) |
