diff options
| author | Robert Collins <robertc@robertcollins.net> | 2009-02-22 08:55:14 +1100 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2009-02-22 08:55:14 +1100 |
| commit | 0c7bb6072d34094bc07cacd140f7efdd3b0d062d (patch) | |
| tree | 4129307512de6ee14240e112702485833e5d27a3 /python/subunit | |
| parent | 6bd993aaf9d5e9bea1295125f2a503731df5e1c3 (diff) | |
| parent | f05660cc320295024d7db8758bde540f319314c3 (diff) | |
| download | subunit-git-0c7bb6072d34094bc07cacd140f7efdd3b0d062d.tar.gz | |
Major new features:
- command line filters for subunit streams.
- tap2subunit
- subunit-stats
- subunit-tags
- subunit2pyunit
- SKIP and XFAIL test support in the protocol.
- Successful tests can have comment text too.
- Timestamps in the protocol can provide timing data for analysis.
Diffstat (limited to 'python/subunit')
| -rw-r--r-- | python/subunit/__init__.py | 339 | ||||
| -rw-r--r-- | python/subunit/tests/__init__.py | 11 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_stats.py | 84 | ||||
| -rw-r--r-- | python/subunit/tests/test_subunit_tags.py | 70 | ||||
| -rw-r--r-- | python/subunit/tests/test_tap2subunit.py | 435 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_protocol.py | 192 |
6 files changed, 1099 insertions, 32 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index 05e4297..f63a3a0 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -21,6 +21,7 @@ import os from StringIO import StringIO import subprocess import sys +import re import unittest def test_suite(): @@ -42,6 +43,18 @@ def join_dir(base_path, path): return os.path.join(os.path.dirname(os.path.abspath(base_path)), path) +def tags_to_new_gone(tags): + """Split a list of tags into a new_set and a gone_set.""" + new_tags = set() + gone_tags = set() + for tag in tags: + if tag[0] == '-': + gone_tags.add(tag[1:]) + else: + new_tags.add(tag) + return new_tags, gone_tags + + class TestProtocolServer(object): """A class for receiving results from a TestProtocol client. @@ -52,6 +65,9 @@ class TestProtocolServer(object): TEST_STARTED = 1 READING_FAILURE = 2 READING_ERROR = 3 + READING_SKIP = 4 + READING_XFAIL = 5 + READING_SUCCESS = 6 def __init__(self, client, stream=sys.stdout): """Create a TestProtocol server instance. @@ -84,6 +100,20 @@ class TestProtocolServer(object): else: self.stdOutLineReceived(line) + def _addExpectedFail(self, offset, line): + if (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description == line[offset:-1]): + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None + self.client.addSuccess(self._current_test) + self.client.stopTest(self._current_test) + elif (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description + " [" == line[offset:-1]): + self.state = TestProtocolServer.READING_XFAIL + self._message = "" + else: + self.stdOutLineReceived(line) + def _addFailure(self, offset, line): if (self.state == TestProtocolServer.TEST_STARTED and self.current_test_description == line[offset:-1]): @@ -98,14 +128,28 @@ class TestProtocolServer(object): else: self.stdOutLineReceived(line) - def _addSuccess(self, offset, line): + def _addSkip(self, offset, line): if (self.state == TestProtocolServer.TEST_STARTED and self.current_test_description == line[offset:-1]): + self.state = TestProtocolServer.OUTSIDE_TEST + self.current_test_description = None self.client.addSuccess(self._current_test) self.client.stopTest(self._current_test) - self.current_test_description = None - self._current_test = None - self.state = TestProtocolServer.OUTSIDE_TEST + elif (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description + " [" == line[offset:-1]): + self.state = TestProtocolServer.READING_SKIP + self._message = "" + else: + self.stdOutLineReceived(line) + + def _addSuccess(self, offset, line): + if (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description == line[offset:-1]): + self._succeedTest() + elif (self.state == TestProtocolServer.TEST_STARTED and + self.current_test_description + " [" == line[offset:-1]): + self.state = TestProtocolServer.READING_SUCCESS + self._message = "" else: self.stdOutLineReceived(line) @@ -129,19 +173,19 @@ class TestProtocolServer(object): self.client.addError(self._current_test, RemoteError(self._message)) self.client.stopTest(self._current_test) + elif self.state in ( + TestProtocolServer.READING_SKIP, + TestProtocolServer.READING_SUCCESS, + TestProtocolServer.READING_XFAIL, + ): + self._succeedTest() else: self.stdOutLineReceived(line) def _handleTags(self, offset, line): """Process a tags command.""" tags = line[offset:].split() - new_tags = set() - gone_tags = set() - for tag in tags: - if tag[0] == '-': - gone_tags.add(tag[1:]) - else: - new_tags.add(tag) + new_tags, gone_tags = tags_to_new_gone(tags) if self.state == TestProtocolServer.OUTSIDE_TEST: update_tags = self.tags else: @@ -153,8 +197,11 @@ class TestProtocolServer(object): """Call the appropriate local method for the received line.""" if line == "]\n": self.endQuote(line) - elif (self.state == TestProtocolServer.READING_FAILURE or - self.state == TestProtocolServer.READING_ERROR): + elif self.state in (TestProtocolServer.READING_FAILURE, + TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP, + TestProtocolServer.READING_SUCCESS, + TestProtocolServer.READING_XFAIL + ): self._appendMessage(line) else: parts = line.split(None, 1) @@ -168,36 +215,46 @@ class TestProtocolServer(object): self._addError(offset, line) elif cmd == 'failure': self._addFailure(offset, line) + elif cmd == 'skip': + self._addSkip(offset, line) elif cmd in ('success', 'successful'): self._addSuccess(offset, line) - elif cmd in ('tags'): + elif cmd in ('tags',): self._handleTags(offset, line) + elif cmd in ('time',): + # Accept it, but do not do anything with it yet. + pass + elif cmd == 'xfail': + self._addExpectedFail(offset, line) else: self.stdOutLineReceived(line) else: self.stdOutLineReceived(line) + def _lostConnectionInTest(self, state_string): + error_string = "lost connection during %stest '%s'" % ( + state_string, self.current_test_description) + self.client.addError(self._current_test, RemoteError(error_string)) + self.client.stopTest(self._current_test) + def lostConnection(self): """The input connection has finished.""" + if self.state == TestProtocolServer.OUTSIDE_TEST: + return if self.state == TestProtocolServer.TEST_STARTED: - self.client.addError(self._current_test, - RemoteError("lost connection during test '%s'" - % self.current_test_description)) - self.client.stopTest(self._current_test) + self._lostConnectionInTest('') elif self.state == TestProtocolServer.READING_ERROR: - self.client.addError(self._current_test, - RemoteError("lost connection during " - "error report of test " - "'%s'" % - self.current_test_description)) - self.client.stopTest(self._current_test) + self._lostConnectionInTest('error report of ') elif self.state == TestProtocolServer.READING_FAILURE: - self.client.addError(self._current_test, - RemoteError("lost connection during " - "failure report of test " - "'%s'" % - self.current_test_description)) - self.client.stopTest(self._current_test) + self._lostConnectionInTest('failure report of ') + elif self.state == TestProtocolServer.READING_SUCCESS: + self._lostConnectionInTest('success report of ') + elif self.state == TestProtocolServer.READING_SKIP: + self._lostConnectionInTest('skip report of ') + elif self.state == TestProtocolServer.READING_XFAIL: + self._lostConnectionInTest('xfail report of ') + else: + self._lostConnectionInTest('unknown state of ') def readFrom(self, pipe): for line in pipe.readlines(): @@ -218,6 +275,13 @@ class TestProtocolServer(object): def stdOutLineReceived(self, line): self._stream.write(line) + def _succeedTest(self): + self.client.addSuccess(self._current_test) + self.client.stopTest(self._current_test) + self.current_test_description = None + self._current_test = None + self.state = TestProtocolServer.OUTSIDE_TEST + class RemoteException(Exception): """An exception that occured remotely to Python.""" @@ -402,3 +466,218 @@ def run_isolated(klass, self, result): os.waitpid(pid, 0) # TODO return code evaluation. return result + + +def TAP2SubUnit(tap, subunit): + """Filter a TAP pipe into a subunit pipe. + + :param tap: A tap pipe/stream/file object. + :param subunit: A pipe/stream/file object to write subunit results to. + :return: The exit code to exit with. + """ + BEFORE_PLAN = 0 + AFTER_PLAN = 1 + SKIP_STREAM = 2 + client = TestProtocolClient(subunit) + state = BEFORE_PLAN + plan_start = 1 + plan_stop = 0 + def _skipped_test(subunit, plan_start): + # Some tests were skipped. + subunit.write('test test %d\n' % plan_start) + subunit.write('error test %d [\n' % plan_start) + subunit.write('test missing from TAP output\n') + subunit.write(']\n') + return plan_start + 1 + # Test data for the next test to emit + test_name = None + log = [] + result = None + def _emit_test(): + "write out a test" + if test_name is None: + return + subunit.write("test %s\n" % test_name) + if not log: + subunit.write("%s %s\n" % (result, test_name)) + else: + subunit.write("%s %s [\n" % (result, test_name)) + if log: + for line in log: + subunit.write("%s\n" % line) + subunit.write("]\n") + del log[:] + for line in tap: + if state == BEFORE_PLAN: + match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line) + if match: + state = AFTER_PLAN + _, plan_stop, comment = match.groups() + plan_stop = int(plan_stop) + if plan_start > plan_stop and plan_stop == 0: + # skipped file + state = SKIP_STREAM + subunit.write("test file skip\n") + subunit.write("skip file skip [\n") + subunit.write("%s\n" % comment) + subunit.write("]\n") + continue + # not a plan line, or have seen one before + match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line) + if match: + # new test, emit current one. + _emit_test() + status, number, description, directive, directive_comment = match.groups() + if status == 'ok': + result = 'success' + else: + result = "failure" + if description is None: + description = '' + else: + description = ' ' + description + if directive is not None: + if directive == 'TODO': + result = 'xfail' + elif directive == 'SKIP': + result = 'skip' + if directive_comment is not None: + log.append(directive_comment) + if number is not None: + number = int(number) + while plan_start < number: + plan_start = _skipped_test(subunit, plan_start) + test_name = "test %d%s" % (plan_start, description) + plan_start += 1 + continue + match = re.match("Bail out\!(?:\s*(.*))?\n", line) + if match: + reason, = match.groups() + if reason is None: + extra = '' + else: + extra = ' %s' % reason + _emit_test() + test_name = "Bail out!%s" % extra + result = "error" + state = SKIP_STREAM + continue + match = re.match("\#.*\n", line) + if match: + log.append(line[:-1]) + continue + subunit.write(line) + _emit_test() + while plan_start <= plan_stop: + # record missed tests + plan_start = _skipped_test(subunit, plan_start) + return 0 + + +def tag_stream(original, filtered, tags): + """Alter tags on a stream. + + :param original: The input stream. + :param filtered: The output stream. + :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or + '-TAG' commands. + + A 'TAG' command will add the tag to the output stream, + and override any existing '-TAG' command in that stream. + Specifically: + * A global 'tags: TAG' will be added to the start of the stream. + * Any tags commands with -TAG will have the -TAG removed. + + A '-TAG' command will remove the TAG command from the stream. + Specifically: + * A 'tags: -TAG' command will be added to the start of the stream. + * Any 'tags: TAG' command will have 'TAG' removed from it. + Additionally, any redundant tagging commands (adding a tag globally + present, or removing a tag globally removed) are stripped as a + by-product of the filtering. + :return: 0 + """ + new_tags, gone_tags = tags_to_new_gone(tags) + def write_tags(new_tags, gone_tags): + if new_tags or gone_tags: + filtered.write("tags: " + ' '.join(new_tags)) + if gone_tags: + for tag in gone_tags: + filtered.write("-" + tag) + filtered.write("\n") + write_tags(new_tags, gone_tags) + # TODO: use the protocol parser and thus don't mangle test comments. + for line in original: + if line.startswith("tags:"): + line_tags = line[5:].split() + line_new, line_gone = tags_to_new_gone(line_tags) + line_new = line_new - gone_tags + line_gone = line_gone - new_tags + write_tags(line_new, line_gone) + else: + filtered.write(line) + return 0 + + +class ProtocolTestCase(object): + """A test case which reports a subunit stream.""" + + def __init__(self, stream): + self._stream = stream + + def __call__(self, result=None): + return self.run(result) + + def run(self, result=None): + if result is None: + result = self.defaultTestResult() + protocol = TestProtocolServer(result) + for line in self._stream: + protocol.lineReceived(line) + protocol.lostConnection() + + +class TestResultStats(unittest.TestResult): + """A pyunit TestResult interface implementation for making statistics. + + :ivar total_tests: The total tests seen. + :ivar passed_tests: The tests that passed. + :ivar failed_tests: The tests that failed. + :ivar tags: The tags seen across all tests. + """ + + def __init__(self, stream): + """Create a TestResultStats which outputs to stream.""" + unittest.TestResult.__init__(self) + self._stream = stream + self.failed_tests = 0 + self.tags = set() + + @property + def total_tests(self): + return self.testsRun + + def addError(self, test, err): + self.failed_tests += 1 + + def addFailure(self, test, err): + self.failed_tests += 1 + + def formatStats(self): + self._stream.write("Total tests: %5d\n" % self.total_tests) + self._stream.write("Passed tests: %5d\n" % self.passed_tests) + self._stream.write("Failed tests: %5d\n" % self.failed_tests) + tags = sorted(self.tags) + self._stream.write("Tags: %s\n" % (", ".join(tags))) + + @property + def passed_tests(self): + return self.total_tests - self.failed_tests + + def stopTest(self, test): + unittest.TestResult.stopTest(self, test) + self.tags.update(test.tags) + + def wasSuccessful(self): + """Tells whether or not this result was a success""" + return self.failed_tests == 0 diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py index 544d0e7..a506281 100644 --- a/python/subunit/tests/__init__.py +++ b/python/subunit/tests/__init__.py @@ -17,9 +17,18 @@ # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # -from subunit.tests import TestUtil, test_test_protocol +from subunit.tests import ( + TestUtil, + test_subunit_stats, + test_subunit_tags, + test_tap2subunit, + test_test_protocol, + ) def test_suite(): result = TestUtil.TestSuite() result.addTest(test_test_protocol.test_suite()) + result.addTest(test_tap2subunit.test_suite()) + result.addTest(test_subunit_tags.test_suite()) + result.addTest(test_subunit_stats.test_suite()) return result diff --git a/python/subunit/tests/test_subunit_stats.py b/python/subunit/tests/test_subunit_stats.py new file mode 100644 index 0000000..b789089 --- /dev/null +++ b/python/subunit/tests/test_subunit_stats.py @@ -0,0 +1,84 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +"""Tests for subunit.TestResultStats.""" + +import unittest +from StringIO import StringIO + +import subunit + + +class TestTestResultStats(unittest.TestCase): + """Test for TestResultStats, a TestResult object that generates stats.""" + + def setUp(self): + self.output = StringIO() + self.result = subunit.TestResultStats(self.output) + self.input_stream = StringIO() + self.test = subunit.ProtocolTestCase(self.input_stream) + + def test_stats_empty(self): + self.test.run(self.result) + self.assertEqual(0, self.result.total_tests) + self.assertEqual(0, self.result.passed_tests) + self.assertEqual(0, self.result.failed_tests) + self.assertEqual(set(), self.result.tags) + + def setUpUsedStream(self): + self.input_stream.write("""tags: global +test passed +success passed +test failed +tags: local +failure failed +test error +error error +test skipped +skip skipped +test todo +xfail todo +""") + self.input_stream.seek(0) + self.test.run(self.result) + + def test_stats_smoke_everything(self): + # Statistics are calculated usefully. + self.setUpUsedStream() + self.assertEqual(5, self.result.total_tests) + self.assertEqual(3, self.result.passed_tests) + self.assertEqual(2, self.result.failed_tests) + self.assertEqual(set(["global", "local"]), self.result.tags) + + def test_stat_formatting(self): + expected = (""" +Total tests: 5 +Passed tests: 3 +Failed tests: 2 +Tags: global, local +""")[1:] + self.setUpUsedStream() + self.result.formatStats() + self.assertEqual(expected, self.output.getvalue()) + + +def test_suite(): + loader = subunit.tests.TestUtil.TestLoader() + result = loader.loadTestsFromName(__name__) + return result diff --git a/python/subunit/tests/test_subunit_tags.py b/python/subunit/tests/test_subunit_tags.py new file mode 100644 index 0000000..66cff68 --- /dev/null +++ b/python/subunit/tests/test_subunit_tags.py @@ -0,0 +1,70 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +"""Tests for subunit.tag_stream.""" + +import unittest +from StringIO import StringIO + +import subunit + + +class TestSubUnitTags(unittest.TestCase): + + def setUp(self): + self.original = StringIO() + self.filtered = StringIO() + + def test_add_tag(self): + self.original.write("tags: foo\n") + self.original.write("test: test\n") + self.original.write("tags: bar -quux\n") + self.original.write("success: test\n") + self.original.seek(0) + result = subunit.tag_stream(self.original, self.filtered, ["quux"]) + self.assertEqual([ + "tags: quux", + "tags: foo", + "test: test", + "tags: bar", + "success: test", + ], + self.filtered.getvalue().splitlines()) + + def test_remove_tag(self): + self.original.write("tags: foo\n") + self.original.write("test: test\n") + self.original.write("tags: bar -quux\n") + self.original.write("success: test\n") + self.original.seek(0) + result = subunit.tag_stream(self.original, self.filtered, ["-bar"]) + self.assertEqual([ + "tags: -bar", + "tags: foo", + "test: test", + "tags: -quux", + "success: test", + ], + self.filtered.getvalue().splitlines()) + + +def test_suite(): + loader = subunit.tests.TestUtil.TestLoader() + result = loader.loadTestsFromName(__name__) + return result diff --git a/python/subunit/tests/test_tap2subunit.py b/python/subunit/tests/test_tap2subunit.py new file mode 100644 index 0000000..bc6bf14 --- /dev/null +++ b/python/subunit/tests/test_tap2subunit.py @@ -0,0 +1,435 @@ +# +# subunit: extensions to python unittest to get test results from subprocesses. +# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# + +"""Tests for TAP2SubUnit.""" + +import unittest +from StringIO import StringIO +import os +import subunit +import sys + + +class TestTAP2SubUnit(unittest.TestCase): + """Tests for TAP2SubUnit. + + These tests test TAP string data in, and subunit string data out. + This is ok because the subunit protocol is intended to be stable, + but it might be easier/pithier to write tests against TAP string in, + parsed subunit objects out (by hooking the subunit stream to a subunit + protocol server. + """ + + def setUp(self): + self.tap = StringIO() + self.subunit = StringIO() + + def test_skip_entire_file(self): + # A file + # 1..- # Skipped: comment + # results in a single skipped test. + self.tap.write("1..0 # Skipped: entire file skipped\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test file skip", + "skip file skip [", + "Skipped: entire file skipped", + "]", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_test_pass(self): + # A file + # ok + # results in a passed test with name 'test 1' (a synthetic name as tap + # does not require named fixtures - it is the first test in the tap + # stream). + self.tap.write("ok\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "success test 1", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_test_number_pass(self): + # A file + # ok 1 + # results in a passed test with name 'test 1' + self.tap.write("ok 1\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "success test 1", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_test_number_description_pass(self): + # A file + # ok 1 - There is a description + # results in a passed test with name 'test 1 - There is a description' + self.tap.write("ok 1 - There is a description\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1 - There is a description", + "success test 1 - There is a description", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_test_description_pass(self): + # A file + # ok There is a description + # results in a passed test with name 'test 1 There is a description' + self.tap.write("ok There is a description\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1 There is a description", + "success test 1 There is a description", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_SKIP_skip(self): + # A file + # ok # SKIP + # results in a skkip test with name 'test 1' + self.tap.write("ok # SKIP\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "skip test 1", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_number_description_SKIP_skip_comment(self): + # A file + # ok 1 foo # SKIP Not done yet + # results in a skip test with name 'test 1 foo' and a log of + # Not done yet + self.tap.write("ok 1 foo # SKIP Not done yet\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1 foo", + "skip test 1 foo [", + "Not done yet", + "]", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_SKIP_skip_comment(self): + # A file + # ok # SKIP Not done yet + # results in a skip test with name 'test 1' and a log of Not done yet + self.tap.write("ok # SKIP Not done yet\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "skip test 1 [", + "Not done yet", + "]", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_TODO_xfail(self): + # A file + # ok # TODO + # results in a xfail test with name 'test 1' + self.tap.write("ok # TODO\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "xfail test 1", + ], + self.subunit.getvalue().splitlines()) + + def test_ok_TODO_xfail_comment(self): + # A file + # ok # TODO Not done yet + # results in a xfail test with name 'test 1' and a log of Not done yet + self.tap.write("ok # TODO Not done yet\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1", + "xfail test 1 [", + "Not done yet", + "]", + ], + self.subunit.getvalue().splitlines()) + + def test_bail_out_errors(self): + # A file with line in it + # Bail out! COMMENT + # is treated as an error + self.tap.write("ok 1 foo\n") + self.tap.write("Bail out! Lifejacket engaged\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + "test test 1 foo", + "success test 1 foo", + "test Bail out! Lifejacket engaged", + "error Bail out! Lifejacket engaged", + ], + self.subunit.getvalue().splitlines()) + + def test_missing_test_at_end_with_plan_adds_error(self): + # A file + # 1..3 + # ok first test + # not ok third test + # results in three tests, with the third being created + self.tap.write('1..3\n') + self.tap.write('ok first test\n') + self.tap.write('not ok second test\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 first test', + 'success test 1 first test', + 'test test 2 second test', + 'failure test 2 second test', + 'test test 3', + 'error test 3 [', + 'test missing from TAP output', + ']', + ], + self.subunit.getvalue().splitlines()) + + def test_missing_test_with_plan_adds_error(self): + # A file + # 1..3 + # ok first test + # not ok 3 third test + # results in three tests, with the second being created + self.tap.write('1..3\n') + self.tap.write('ok first test\n') + self.tap.write('not ok 3 third test\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 first test', + 'success test 1 first test', + 'test test 2', + 'error test 2 [', + 'test missing from TAP output', + ']', + 'test test 3 third test', + 'failure test 3 third test', + ], + self.subunit.getvalue().splitlines()) + + def test_missing_test_no_plan_adds_error(self): + # A file + # ok first test + # not ok 3 third test + # results in three tests, with the second being created + self.tap.write('ok first test\n') + self.tap.write('not ok 3 third test\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 first test', + 'success test 1 first test', + 'test test 2', + 'error test 2 [', + 'test missing from TAP output', + ']', + 'test test 3 third test', + 'failure test 3 third test', + ], + self.subunit.getvalue().splitlines()) + + def test_four_tests_in_a_row_trailing_plan(self): + # A file + # ok 1 - first test in a script with no plan at all + # not ok 2 - second + # ok 3 - third + # not ok 4 - fourth + # 1..4 + # results in four tests numbered and named + self.tap.write('ok 1 - first test in a script with trailing plan\n') + self.tap.write('not ok 2 - second\n') + self.tap.write('ok 3 - third\n') + self.tap.write('not ok 4 - fourth\n') + self.tap.write('1..4\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 - first test in a script with trailing plan', + 'success test 1 - first test in a script with trailing plan', + 'test test 2 - second', + 'failure test 2 - second', + 'test test 3 - third', + 'success test 3 - third', + 'test test 4 - fourth', + 'failure test 4 - fourth' + ], + self.subunit.getvalue().splitlines()) + + def test_four_tests_in_a_row_with_plan(self): + # A file + # 1..4 + # ok 1 - first test in a script with no plan at all + # not ok 2 - second + # ok 3 - third + # not ok 4 - fourth + # results in four tests numbered and named + self.tap.write('1..4\n') + self.tap.write('ok 1 - first test in a script with a plan\n') + self.tap.write('not ok 2 - second\n') + self.tap.write('ok 3 - third\n') + self.tap.write('not ok 4 - fourth\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 - first test in a script with a plan', + 'success test 1 - first test in a script with a plan', + 'test test 2 - second', + 'failure test 2 - second', + 'test test 3 - third', + 'success test 3 - third', + 'test test 4 - fourth', + 'failure test 4 - fourth' + ], + self.subunit.getvalue().splitlines()) + + def test_four_tests_in_a_row_no_plan(self): + # A file + # ok 1 - first test in a script with no plan at all + # not ok 2 - second + # ok 3 - third + # not ok 4 - fourth + # results in four tests numbered and named + self.tap.write('ok 1 - first test in a script with no plan at all\n') + self.tap.write('not ok 2 - second\n') + self.tap.write('ok 3 - third\n') + self.tap.write('not ok 4 - fourth\n') + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 - first test in a script with no plan at all', + 'success test 1 - first test in a script with no plan at all', + 'test test 2 - second', + 'failure test 2 - second', + 'test test 3 - third', + 'success test 3 - third', + 'test test 4 - fourth', + 'failure test 4 - fourth' + ], + self.subunit.getvalue().splitlines()) + + def test_todo_and_skip(self): + # A file + # not ok 1 - a fail but # TODO but is TODO + # not ok 2 - another fail # SKIP instead + # results in two tests, numbered and commented. + self.tap.write("not ok 1 - a fail but # TODO but is TODO\n") + self.tap.write("not ok 2 - another fail # SKIP instead\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1 - a fail but', + 'xfail test 1 - a fail but [', + 'but is TODO', + ']', + 'test test 2 - another fail', + 'skip test 2 - another fail [', + 'instead', + ']', + ], + self.subunit.getvalue().splitlines()) + + def test_leading_comments_add_to_next_test_log(self): + # A file + # # comment + # ok + # ok + # results in a single test with the comment included + # in the first test and not the second. + self.tap.write("# comment\n") + self.tap.write("ok\n") + self.tap.write("ok\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1', + 'success test 1 [', + '# comment', + ']', + 'test test 2', + 'success test 2', + ], + self.subunit.getvalue().splitlines()) + + def test_trailing_comments_are_included_in_last_test_log(self): + # A file + # ok foo + # ok foo + # # comment + # results in a two tests, with the second having the comment + # attached to its log. + self.tap.write("ok\n") + self.tap.write("ok\n") + self.tap.write("# comment\n") + self.tap.seek(0) + result = subunit.TAP2SubUnit(self.tap, self.subunit) + self.assertEqual(0, result) + self.assertEqual([ + 'test test 1', + 'success test 1', + 'test test 2', + 'success test 2 [', + '# comment', + ']', + ], + self.subunit.getvalue().splitlines()) + + +def test_suite(): + loader = subunit.tests.TestUtil.TestLoader() + result = loader.loadTestsFromName(__name__) + return result diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py index 52b33c8..1f9e3a9 100644 --- a/python/subunit/tests/test_test_protocol.py +++ b/python/subunit/tests/test_test_protocol.py @@ -1,5 +1,5 @@ # -# subunit: extensions to python unittest to get test results from subprocesses. +# subunit: extensions to Python unittest to get test results from subprocesses. # Copyright (C) 2005 Robert Collins <robertc@robertcollins.net> # # This program is free software; you can redistribute it and/or modify @@ -22,6 +22,7 @@ from StringIO import StringIO import os import subunit import sys +import time try: class MockTestProtocolServerClient(object): @@ -396,6 +397,42 @@ class TestTestProtocolServerLostConnection(unittest.TestCase): self.assertEqual(self.client.failure_calls, []) self.assertEqual(self.client.success_calls, [self.test]) + def test_lost_connection_during_skip(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("skip old mcdonald [\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("lost connection during skip " + "report of test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_during_xfail(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("xfail old mcdonald [\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("lost connection during xfail " + "report of test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + + def test_lost_connection_during_success(self): + self.protocol.lineReceived("test old mcdonald\n") + self.protocol.lineReceived("success old mcdonald [\n") + self.protocol.lostConnection() + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, [ + (self.test, subunit.RemoteError("lost connection during success " + "report of test 'old mcdonald'"))]) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, []) + class TestTestProtocolServerAddError(unittest.TestCase): @@ -493,6 +530,118 @@ class TestTestProtocolServerAddFailure(unittest.TestCase): self.failure_quoted_bracket("failure:") +class TestTestProtocolServerAddxFail(unittest.TestCase): + """Tests for the xfail keyword. + + In Python this thunks through to Success due to stdlib limitations (see + README). + """ + + def setUp(self): + """Setup a test object ready to be xfailed.""" + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.protocol.lineReceived("test mcdonalds farm\n") + self.test = self.client.start_calls[-1] + + def simple_xfail_keyword(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_simple_xfail(self): + self.simple_xfail_keyword("xfail") + + def test_simple_xfail_colon(self): + self.simple_xfail_keyword("xfail:") + + def test_xfail_empty_message(self): + self.protocol.lineReceived("xfail mcdonalds farm [\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def xfail_quoted_bracket(self, keyword): + # This tests it is accepted, but cannot test it is used today, because + # of not having a way to expose it in Python so far. + self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_xfail_quoted_bracket(self): + self.xfail_quoted_bracket("xfail") + + def test_xfail_colon_quoted_bracket(self): + self.xfail_quoted_bracket("xfail:") + + +class TestTestProtocolServerAddSkip(unittest.TestCase): + """Tests for the skip keyword. + + In Python this thunks through to Success due to stdlib limitations. (See + README). + """ + + def setUp(self): + """Setup a test object ready to be skipped.""" + self.client = MockTestProtocolServerClient() + self.protocol = subunit.TestProtocolServer(self.client) + self.protocol.lineReceived("test mcdonalds farm\n") + self.test = self.client.start_calls[-1] + + def simple_skip_keyword(self, keyword): + self.protocol.lineReceived("%s mcdonalds farm\n" % keyword) + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_simple_skip(self): + self.simple_skip_keyword("skip") + + def test_simple_skip_colon(self): + self.simple_skip_keyword("skip:") + + def test_skip_empty_message(self): + self.protocol.lineReceived("skip mcdonalds farm [\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def skip_quoted_bracket(self, keyword): + # This tests it is accepted, but cannot test it is used today, because + # of not having a way to expose it in Python so far. + self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_skip_quoted_bracket(self): + self.skip_quoted_bracket("skip") + + def test_skip_colon_quoted_bracket(self): + self.skip_quoted_bracket("skip:") + + class TestTestProtocolServerAddSuccess(unittest.TestCase): def setUp(self): @@ -520,6 +669,33 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase): def test_simple_success_colon(self): self.simple_success_keyword("successful:") + def test_success_empty_message(self): + self.protocol.lineReceived("success mcdonalds farm [\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def success_quoted_bracket(self, keyword): + # This tests it is accepted, but cannot test it is used today, because + # of not having a way to expose it in Python so far. + self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword) + self.protocol.lineReceived(" ]\n") + self.protocol.lineReceived("]\n") + self.assertEqual(self.client.start_calls, [self.test]) + self.assertEqual(self.client.end_calls, [self.test]) + self.assertEqual(self.client.error_calls, []) + self.assertEqual(self.client.failure_calls, []) + self.assertEqual(self.client.success_calls, [self.test]) + + def test_success_quoted_bracket(self): + self.success_quoted_bracket("success") + + def test_success_colon_quoted_bracket(self): + self.success_quoted_bracket("success:") + class TestTestProtocolServerStreamTags(unittest.TestCase): """Test managing tags on the protocol level.""" @@ -574,6 +750,20 @@ class TestTestProtocolServerStreamTags(unittest.TestCase): self.assertEqual(set(["foo", "bar"]), self.protocol.tags) +class TestTestProtocolServerStreamTime(unittest.TestCase): + """Test managing time information at the protocol level.""" + + def setUp(self): + self.client = MockTestProtocolServerClient() + self.stream = StringIO() + self.protocol = subunit.TestProtocolServer(self.client, + stream=self.stream) + + def test_time_accepted(self): + self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n") + self.assertEqual("", self.stream.getvalue()) + + class TestRemotedTestCase(unittest.TestCase): def test_simple(self): |
