summaryrefslogtreecommitdiff
path: root/python/subunit
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2009-02-22 08:55:14 +1100
committerRobert Collins <robertc@robertcollins.net>2009-02-22 08:55:14 +1100
commit0c7bb6072d34094bc07cacd140f7efdd3b0d062d (patch)
tree4129307512de6ee14240e112702485833e5d27a3 /python/subunit
parent6bd993aaf9d5e9bea1295125f2a503731df5e1c3 (diff)
parentf05660cc320295024d7db8758bde540f319314c3 (diff)
downloadsubunit-git-0c7bb6072d34094bc07cacd140f7efdd3b0d062d.tar.gz
Major new features:
- command line filters for subunit streams. - tap2subunit - subunit-stats - subunit-tags - subunit2pyunit - SKIP and XFAIL test support in the protocol. - Successful tests can have comment text too. - Timestamps in the protocol can provide timing data for analysis.
Diffstat (limited to 'python/subunit')
-rw-r--r--python/subunit/__init__.py339
-rw-r--r--python/subunit/tests/__init__.py11
-rw-r--r--python/subunit/tests/test_subunit_stats.py84
-rw-r--r--python/subunit/tests/test_subunit_tags.py70
-rw-r--r--python/subunit/tests/test_tap2subunit.py435
-rw-r--r--python/subunit/tests/test_test_protocol.py192
6 files changed, 1099 insertions, 32 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index 05e4297..f63a3a0 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -21,6 +21,7 @@ import os
from StringIO import StringIO
import subprocess
import sys
+import re
import unittest
def test_suite():
@@ -42,6 +43,18 @@ def join_dir(base_path, path):
return os.path.join(os.path.dirname(os.path.abspath(base_path)), path)
+def tags_to_new_gone(tags):
+ """Split a list of tags into a new_set and a gone_set."""
+ new_tags = set()
+ gone_tags = set()
+ for tag in tags:
+ if tag[0] == '-':
+ gone_tags.add(tag[1:])
+ else:
+ new_tags.add(tag)
+ return new_tags, gone_tags
+
+
class TestProtocolServer(object):
"""A class for receiving results from a TestProtocol client.
@@ -52,6 +65,9 @@ class TestProtocolServer(object):
TEST_STARTED = 1
READING_FAILURE = 2
READING_ERROR = 3
+ READING_SKIP = 4
+ READING_XFAIL = 5
+ READING_SUCCESS = 6
def __init__(self, client, stream=sys.stdout):
"""Create a TestProtocol server instance.
@@ -84,6 +100,20 @@ class TestProtocolServer(object):
else:
self.stdOutLineReceived(line)
+ def _addExpectedFail(self, offset, line):
+ if (self.state == TestProtocolServer.TEST_STARTED and
+ self.current_test_description == line[offset:-1]):
+ self.state = TestProtocolServer.OUTSIDE_TEST
+ self.current_test_description = None
+ self.client.addSuccess(self._current_test)
+ self.client.stopTest(self._current_test)
+ elif (self.state == TestProtocolServer.TEST_STARTED and
+ self.current_test_description + " [" == line[offset:-1]):
+ self.state = TestProtocolServer.READING_XFAIL
+ self._message = ""
+ else:
+ self.stdOutLineReceived(line)
+
def _addFailure(self, offset, line):
if (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description == line[offset:-1]):
@@ -98,14 +128,28 @@ class TestProtocolServer(object):
else:
self.stdOutLineReceived(line)
- def _addSuccess(self, offset, line):
+ def _addSkip(self, offset, line):
if (self.state == TestProtocolServer.TEST_STARTED and
self.current_test_description == line[offset:-1]):
+ self.state = TestProtocolServer.OUTSIDE_TEST
+ self.current_test_description = None
self.client.addSuccess(self._current_test)
self.client.stopTest(self._current_test)
- self.current_test_description = None
- self._current_test = None
- self.state = TestProtocolServer.OUTSIDE_TEST
+ elif (self.state == TestProtocolServer.TEST_STARTED and
+ self.current_test_description + " [" == line[offset:-1]):
+ self.state = TestProtocolServer.READING_SKIP
+ self._message = ""
+ else:
+ self.stdOutLineReceived(line)
+
+ def _addSuccess(self, offset, line):
+ if (self.state == TestProtocolServer.TEST_STARTED and
+ self.current_test_description == line[offset:-1]):
+ self._succeedTest()
+ elif (self.state == TestProtocolServer.TEST_STARTED and
+ self.current_test_description + " [" == line[offset:-1]):
+ self.state = TestProtocolServer.READING_SUCCESS
+ self._message = ""
else:
self.stdOutLineReceived(line)
@@ -129,19 +173,19 @@ class TestProtocolServer(object):
self.client.addError(self._current_test,
RemoteError(self._message))
self.client.stopTest(self._current_test)
+ elif self.state in (
+ TestProtocolServer.READING_SKIP,
+ TestProtocolServer.READING_SUCCESS,
+ TestProtocolServer.READING_XFAIL,
+ ):
+ self._succeedTest()
else:
self.stdOutLineReceived(line)
def _handleTags(self, offset, line):
"""Process a tags command."""
tags = line[offset:].split()
- new_tags = set()
- gone_tags = set()
- for tag in tags:
- if tag[0] == '-':
- gone_tags.add(tag[1:])
- else:
- new_tags.add(tag)
+ new_tags, gone_tags = tags_to_new_gone(tags)
if self.state == TestProtocolServer.OUTSIDE_TEST:
update_tags = self.tags
else:
@@ -153,8 +197,11 @@ class TestProtocolServer(object):
"""Call the appropriate local method for the received line."""
if line == "]\n":
self.endQuote(line)
- elif (self.state == TestProtocolServer.READING_FAILURE or
- self.state == TestProtocolServer.READING_ERROR):
+ elif self.state in (TestProtocolServer.READING_FAILURE,
+ TestProtocolServer.READING_ERROR, TestProtocolServer.READING_SKIP,
+ TestProtocolServer.READING_SUCCESS,
+ TestProtocolServer.READING_XFAIL
+ ):
self._appendMessage(line)
else:
parts = line.split(None, 1)
@@ -168,36 +215,46 @@ class TestProtocolServer(object):
self._addError(offset, line)
elif cmd == 'failure':
self._addFailure(offset, line)
+ elif cmd == 'skip':
+ self._addSkip(offset, line)
elif cmd in ('success', 'successful'):
self._addSuccess(offset, line)
- elif cmd in ('tags'):
+ elif cmd in ('tags',):
self._handleTags(offset, line)
+ elif cmd in ('time',):
+ # Accept it, but do not do anything with it yet.
+ pass
+ elif cmd == 'xfail':
+ self._addExpectedFail(offset, line)
else:
self.stdOutLineReceived(line)
else:
self.stdOutLineReceived(line)
+ def _lostConnectionInTest(self, state_string):
+ error_string = "lost connection during %stest '%s'" % (
+ state_string, self.current_test_description)
+ self.client.addError(self._current_test, RemoteError(error_string))
+ self.client.stopTest(self._current_test)
+
def lostConnection(self):
"""The input connection has finished."""
+ if self.state == TestProtocolServer.OUTSIDE_TEST:
+ return
if self.state == TestProtocolServer.TEST_STARTED:
- self.client.addError(self._current_test,
- RemoteError("lost connection during test '%s'"
- % self.current_test_description))
- self.client.stopTest(self._current_test)
+ self._lostConnectionInTest('')
elif self.state == TestProtocolServer.READING_ERROR:
- self.client.addError(self._current_test,
- RemoteError("lost connection during "
- "error report of test "
- "'%s'" %
- self.current_test_description))
- self.client.stopTest(self._current_test)
+ self._lostConnectionInTest('error report of ')
elif self.state == TestProtocolServer.READING_FAILURE:
- self.client.addError(self._current_test,
- RemoteError("lost connection during "
- "failure report of test "
- "'%s'" %
- self.current_test_description))
- self.client.stopTest(self._current_test)
+ self._lostConnectionInTest('failure report of ')
+ elif self.state == TestProtocolServer.READING_SUCCESS:
+ self._lostConnectionInTest('success report of ')
+ elif self.state == TestProtocolServer.READING_SKIP:
+ self._lostConnectionInTest('skip report of ')
+ elif self.state == TestProtocolServer.READING_XFAIL:
+ self._lostConnectionInTest('xfail report of ')
+ else:
+ self._lostConnectionInTest('unknown state of ')
def readFrom(self, pipe):
for line in pipe.readlines():
@@ -218,6 +275,13 @@ class TestProtocolServer(object):
def stdOutLineReceived(self, line):
self._stream.write(line)
+ def _succeedTest(self):
+ self.client.addSuccess(self._current_test)
+ self.client.stopTest(self._current_test)
+ self.current_test_description = None
+ self._current_test = None
+ self.state = TestProtocolServer.OUTSIDE_TEST
+
class RemoteException(Exception):
"""An exception that occured remotely to Python."""
@@ -402,3 +466,218 @@ def run_isolated(klass, self, result):
os.waitpid(pid, 0)
# TODO return code evaluation.
return result
+
+
+def TAP2SubUnit(tap, subunit):
+ """Filter a TAP pipe into a subunit pipe.
+
+ :param tap: A tap pipe/stream/file object.
+ :param subunit: A pipe/stream/file object to write subunit results to.
+ :return: The exit code to exit with.
+ """
+ BEFORE_PLAN = 0
+ AFTER_PLAN = 1
+ SKIP_STREAM = 2
+ client = TestProtocolClient(subunit)
+ state = BEFORE_PLAN
+ plan_start = 1
+ plan_stop = 0
+ def _skipped_test(subunit, plan_start):
+ # Some tests were skipped.
+ subunit.write('test test %d\n' % plan_start)
+ subunit.write('error test %d [\n' % plan_start)
+ subunit.write('test missing from TAP output\n')
+ subunit.write(']\n')
+ return plan_start + 1
+ # Test data for the next test to emit
+ test_name = None
+ log = []
+ result = None
+ def _emit_test():
+ "write out a test"
+ if test_name is None:
+ return
+ subunit.write("test %s\n" % test_name)
+ if not log:
+ subunit.write("%s %s\n" % (result, test_name))
+ else:
+ subunit.write("%s %s [\n" % (result, test_name))
+ if log:
+ for line in log:
+ subunit.write("%s\n" % line)
+ subunit.write("]\n")
+ del log[:]
+ for line in tap:
+ if state == BEFORE_PLAN:
+ match = re.match("(\d+)\.\.(\d+)\s*(?:\#\s+(.*))?\n", line)
+ if match:
+ state = AFTER_PLAN
+ _, plan_stop, comment = match.groups()
+ plan_stop = int(plan_stop)
+ if plan_start > plan_stop and plan_stop == 0:
+ # skipped file
+ state = SKIP_STREAM
+ subunit.write("test file skip\n")
+ subunit.write("skip file skip [\n")
+ subunit.write("%s\n" % comment)
+ subunit.write("]\n")
+ continue
+ # not a plan line, or have seen one before
+ match = re.match("(ok|not ok)(?:\s+(\d+)?)?(?:\s+([^#]*[^#\s]+)\s*)?(?:\s+#\s+(TODO|SKIP)(?:\s+(.*))?)?\n", line)
+ if match:
+ # new test, emit current one.
+ _emit_test()
+ status, number, description, directive, directive_comment = match.groups()
+ if status == 'ok':
+ result = 'success'
+ else:
+ result = "failure"
+ if description is None:
+ description = ''
+ else:
+ description = ' ' + description
+ if directive is not None:
+ if directive == 'TODO':
+ result = 'xfail'
+ elif directive == 'SKIP':
+ result = 'skip'
+ if directive_comment is not None:
+ log.append(directive_comment)
+ if number is not None:
+ number = int(number)
+ while plan_start < number:
+ plan_start = _skipped_test(subunit, plan_start)
+ test_name = "test %d%s" % (plan_start, description)
+ plan_start += 1
+ continue
+ match = re.match("Bail out\!(?:\s*(.*))?\n", line)
+ if match:
+ reason, = match.groups()
+ if reason is None:
+ extra = ''
+ else:
+ extra = ' %s' % reason
+ _emit_test()
+ test_name = "Bail out!%s" % extra
+ result = "error"
+ state = SKIP_STREAM
+ continue
+ match = re.match("\#.*\n", line)
+ if match:
+ log.append(line[:-1])
+ continue
+ subunit.write(line)
+ _emit_test()
+ while plan_start <= plan_stop:
+ # record missed tests
+ plan_start = _skipped_test(subunit, plan_start)
+ return 0
+
+
+def tag_stream(original, filtered, tags):
+ """Alter tags on a stream.
+
+ :param original: The input stream.
+ :param filtered: The output stream.
+ :param tags: The tags to apply. As in a normal stream - a list of 'TAG' or
+ '-TAG' commands.
+
+ A 'TAG' command will add the tag to the output stream,
+ and override any existing '-TAG' command in that stream.
+ Specifically:
+ * A global 'tags: TAG' will be added to the start of the stream.
+ * Any tags commands with -TAG will have the -TAG removed.
+
+ A '-TAG' command will remove the TAG command from the stream.
+ Specifically:
+ * A 'tags: -TAG' command will be added to the start of the stream.
+ * Any 'tags: TAG' command will have 'TAG' removed from it.
+ Additionally, any redundant tagging commands (adding a tag globally
+ present, or removing a tag globally removed) are stripped as a
+ by-product of the filtering.
+ :return: 0
+ """
+ new_tags, gone_tags = tags_to_new_gone(tags)
+ def write_tags(new_tags, gone_tags):
+ if new_tags or gone_tags:
+ filtered.write("tags: " + ' '.join(new_tags))
+ if gone_tags:
+ for tag in gone_tags:
+ filtered.write("-" + tag)
+ filtered.write("\n")
+ write_tags(new_tags, gone_tags)
+ # TODO: use the protocol parser and thus don't mangle test comments.
+ for line in original:
+ if line.startswith("tags:"):
+ line_tags = line[5:].split()
+ line_new, line_gone = tags_to_new_gone(line_tags)
+ line_new = line_new - gone_tags
+ line_gone = line_gone - new_tags
+ write_tags(line_new, line_gone)
+ else:
+ filtered.write(line)
+ return 0
+
+
+class ProtocolTestCase(object):
+ """A test case which reports a subunit stream."""
+
+ def __init__(self, stream):
+ self._stream = stream
+
+ def __call__(self, result=None):
+ return self.run(result)
+
+ def run(self, result=None):
+ if result is None:
+ result = self.defaultTestResult()
+ protocol = TestProtocolServer(result)
+ for line in self._stream:
+ protocol.lineReceived(line)
+ protocol.lostConnection()
+
+
+class TestResultStats(unittest.TestResult):
+ """A pyunit TestResult interface implementation for making statistics.
+
+ :ivar total_tests: The total tests seen.
+ :ivar passed_tests: The tests that passed.
+ :ivar failed_tests: The tests that failed.
+ :ivar tags: The tags seen across all tests.
+ """
+
+ def __init__(self, stream):
+ """Create a TestResultStats which outputs to stream."""
+ unittest.TestResult.__init__(self)
+ self._stream = stream
+ self.failed_tests = 0
+ self.tags = set()
+
+ @property
+ def total_tests(self):
+ return self.testsRun
+
+ def addError(self, test, err):
+ self.failed_tests += 1
+
+ def addFailure(self, test, err):
+ self.failed_tests += 1
+
+ def formatStats(self):
+ self._stream.write("Total tests: %5d\n" % self.total_tests)
+ self._stream.write("Passed tests: %5d\n" % self.passed_tests)
+ self._stream.write("Failed tests: %5d\n" % self.failed_tests)
+ tags = sorted(self.tags)
+ self._stream.write("Tags: %s\n" % (", ".join(tags)))
+
+ @property
+ def passed_tests(self):
+ return self.total_tests - self.failed_tests
+
+ def stopTest(self, test):
+ unittest.TestResult.stopTest(self, test)
+ self.tags.update(test.tags)
+
+ def wasSuccessful(self):
+ """Tells whether or not this result was a success"""
+ return self.failed_tests == 0
diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py
index 544d0e7..a506281 100644
--- a/python/subunit/tests/__init__.py
+++ b/python/subunit/tests/__init__.py
@@ -17,9 +17,18 @@
# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
-from subunit.tests import TestUtil, test_test_protocol
+from subunit.tests import (
+ TestUtil,
+ test_subunit_stats,
+ test_subunit_tags,
+ test_tap2subunit,
+ test_test_protocol,
+ )
def test_suite():
result = TestUtil.TestSuite()
result.addTest(test_test_protocol.test_suite())
+ result.addTest(test_tap2subunit.test_suite())
+ result.addTest(test_subunit_tags.test_suite())
+ result.addTest(test_subunit_stats.test_suite())
return result
diff --git a/python/subunit/tests/test_subunit_stats.py b/python/subunit/tests/test_subunit_stats.py
new file mode 100644
index 0000000..b789089
--- /dev/null
+++ b/python/subunit/tests/test_subunit_stats.py
@@ -0,0 +1,84 @@
+#
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+"""Tests for subunit.TestResultStats."""
+
+import unittest
+from StringIO import StringIO
+
+import subunit
+
+
+class TestTestResultStats(unittest.TestCase):
+ """Test for TestResultStats, a TestResult object that generates stats."""
+
+ def setUp(self):
+ self.output = StringIO()
+ self.result = subunit.TestResultStats(self.output)
+ self.input_stream = StringIO()
+ self.test = subunit.ProtocolTestCase(self.input_stream)
+
+ def test_stats_empty(self):
+ self.test.run(self.result)
+ self.assertEqual(0, self.result.total_tests)
+ self.assertEqual(0, self.result.passed_tests)
+ self.assertEqual(0, self.result.failed_tests)
+ self.assertEqual(set(), self.result.tags)
+
+ def setUpUsedStream(self):
+ self.input_stream.write("""tags: global
+test passed
+success passed
+test failed
+tags: local
+failure failed
+test error
+error error
+test skipped
+skip skipped
+test todo
+xfail todo
+""")
+ self.input_stream.seek(0)
+ self.test.run(self.result)
+
+ def test_stats_smoke_everything(self):
+ # Statistics are calculated usefully.
+ self.setUpUsedStream()
+ self.assertEqual(5, self.result.total_tests)
+ self.assertEqual(3, self.result.passed_tests)
+ self.assertEqual(2, self.result.failed_tests)
+ self.assertEqual(set(["global", "local"]), self.result.tags)
+
+ def test_stat_formatting(self):
+ expected = ("""
+Total tests: 5
+Passed tests: 3
+Failed tests: 2
+Tags: global, local
+""")[1:]
+ self.setUpUsedStream()
+ self.result.formatStats()
+ self.assertEqual(expected, self.output.getvalue())
+
+
+def test_suite():
+ loader = subunit.tests.TestUtil.TestLoader()
+ result = loader.loadTestsFromName(__name__)
+ return result
diff --git a/python/subunit/tests/test_subunit_tags.py b/python/subunit/tests/test_subunit_tags.py
new file mode 100644
index 0000000..66cff68
--- /dev/null
+++ b/python/subunit/tests/test_subunit_tags.py
@@ -0,0 +1,70 @@
+#
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+"""Tests for subunit.tag_stream."""
+
+import unittest
+from StringIO import StringIO
+
+import subunit
+
+
+class TestSubUnitTags(unittest.TestCase):
+
+ def setUp(self):
+ self.original = StringIO()
+ self.filtered = StringIO()
+
+ def test_add_tag(self):
+ self.original.write("tags: foo\n")
+ self.original.write("test: test\n")
+ self.original.write("tags: bar -quux\n")
+ self.original.write("success: test\n")
+ self.original.seek(0)
+ result = subunit.tag_stream(self.original, self.filtered, ["quux"])
+ self.assertEqual([
+ "tags: quux",
+ "tags: foo",
+ "test: test",
+ "tags: bar",
+ "success: test",
+ ],
+ self.filtered.getvalue().splitlines())
+
+ def test_remove_tag(self):
+ self.original.write("tags: foo\n")
+ self.original.write("test: test\n")
+ self.original.write("tags: bar -quux\n")
+ self.original.write("success: test\n")
+ self.original.seek(0)
+ result = subunit.tag_stream(self.original, self.filtered, ["-bar"])
+ self.assertEqual([
+ "tags: -bar",
+ "tags: foo",
+ "test: test",
+ "tags: -quux",
+ "success: test",
+ ],
+ self.filtered.getvalue().splitlines())
+
+
+def test_suite():
+ loader = subunit.tests.TestUtil.TestLoader()
+ result = loader.loadTestsFromName(__name__)
+ return result
diff --git a/python/subunit/tests/test_tap2subunit.py b/python/subunit/tests/test_tap2subunit.py
new file mode 100644
index 0000000..bc6bf14
--- /dev/null
+++ b/python/subunit/tests/test_tap2subunit.py
@@ -0,0 +1,435 @@
+#
+# subunit: extensions to python unittest to get test results from subprocesses.
+# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+#
+
+"""Tests for TAP2SubUnit."""
+
+import unittest
+from StringIO import StringIO
+import os
+import subunit
+import sys
+
+
+class TestTAP2SubUnit(unittest.TestCase):
+ """Tests for TAP2SubUnit.
+
+ These tests test TAP string data in, and subunit string data out.
+ This is ok because the subunit protocol is intended to be stable,
+ but it might be easier/pithier to write tests against TAP string in,
+ parsed subunit objects out (by hooking the subunit stream to a subunit
+ protocol server.
+ """
+
+ def setUp(self):
+ self.tap = StringIO()
+ self.subunit = StringIO()
+
+ def test_skip_entire_file(self):
+ # A file
+ # 1..- # Skipped: comment
+ # results in a single skipped test.
+ self.tap.write("1..0 # Skipped: entire file skipped\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test file skip",
+ "skip file skip [",
+ "Skipped: entire file skipped",
+ "]",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_test_pass(self):
+ # A file
+ # ok
+ # results in a passed test with name 'test 1' (a synthetic name as tap
+ # does not require named fixtures - it is the first test in the tap
+ # stream).
+ self.tap.write("ok\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "success test 1",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_test_number_pass(self):
+ # A file
+ # ok 1
+ # results in a passed test with name 'test 1'
+ self.tap.write("ok 1\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "success test 1",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_test_number_description_pass(self):
+ # A file
+ # ok 1 - There is a description
+ # results in a passed test with name 'test 1 - There is a description'
+ self.tap.write("ok 1 - There is a description\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1 - There is a description",
+ "success test 1 - There is a description",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_test_description_pass(self):
+ # A file
+ # ok There is a description
+ # results in a passed test with name 'test 1 There is a description'
+ self.tap.write("ok There is a description\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1 There is a description",
+ "success test 1 There is a description",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_SKIP_skip(self):
+ # A file
+ # ok # SKIP
+ # results in a skkip test with name 'test 1'
+ self.tap.write("ok # SKIP\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "skip test 1",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_number_description_SKIP_skip_comment(self):
+ # A file
+ # ok 1 foo # SKIP Not done yet
+ # results in a skip test with name 'test 1 foo' and a log of
+ # Not done yet
+ self.tap.write("ok 1 foo # SKIP Not done yet\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1 foo",
+ "skip test 1 foo [",
+ "Not done yet",
+ "]",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_SKIP_skip_comment(self):
+ # A file
+ # ok # SKIP Not done yet
+ # results in a skip test with name 'test 1' and a log of Not done yet
+ self.tap.write("ok # SKIP Not done yet\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "skip test 1 [",
+ "Not done yet",
+ "]",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_TODO_xfail(self):
+ # A file
+ # ok # TODO
+ # results in a xfail test with name 'test 1'
+ self.tap.write("ok # TODO\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "xfail test 1",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_ok_TODO_xfail_comment(self):
+ # A file
+ # ok # TODO Not done yet
+ # results in a xfail test with name 'test 1' and a log of Not done yet
+ self.tap.write("ok # TODO Not done yet\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1",
+ "xfail test 1 [",
+ "Not done yet",
+ "]",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_bail_out_errors(self):
+ # A file with line in it
+ # Bail out! COMMENT
+ # is treated as an error
+ self.tap.write("ok 1 foo\n")
+ self.tap.write("Bail out! Lifejacket engaged\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ "test test 1 foo",
+ "success test 1 foo",
+ "test Bail out! Lifejacket engaged",
+ "error Bail out! Lifejacket engaged",
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_missing_test_at_end_with_plan_adds_error(self):
+ # A file
+ # 1..3
+ # ok first test
+ # not ok third test
+ # results in three tests, with the third being created
+ self.tap.write('1..3\n')
+ self.tap.write('ok first test\n')
+ self.tap.write('not ok second test\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 first test',
+ 'success test 1 first test',
+ 'test test 2 second test',
+ 'failure test 2 second test',
+ 'test test 3',
+ 'error test 3 [',
+ 'test missing from TAP output',
+ ']',
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_missing_test_with_plan_adds_error(self):
+ # A file
+ # 1..3
+ # ok first test
+ # not ok 3 third test
+ # results in three tests, with the second being created
+ self.tap.write('1..3\n')
+ self.tap.write('ok first test\n')
+ self.tap.write('not ok 3 third test\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 first test',
+ 'success test 1 first test',
+ 'test test 2',
+ 'error test 2 [',
+ 'test missing from TAP output',
+ ']',
+ 'test test 3 third test',
+ 'failure test 3 third test',
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_missing_test_no_plan_adds_error(self):
+ # A file
+ # ok first test
+ # not ok 3 third test
+ # results in three tests, with the second being created
+ self.tap.write('ok first test\n')
+ self.tap.write('not ok 3 third test\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 first test',
+ 'success test 1 first test',
+ 'test test 2',
+ 'error test 2 [',
+ 'test missing from TAP output',
+ ']',
+ 'test test 3 third test',
+ 'failure test 3 third test',
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_four_tests_in_a_row_trailing_plan(self):
+ # A file
+ # ok 1 - first test in a script with no plan at all
+ # not ok 2 - second
+ # ok 3 - third
+ # not ok 4 - fourth
+ # 1..4
+ # results in four tests numbered and named
+ self.tap.write('ok 1 - first test in a script with trailing plan\n')
+ self.tap.write('not ok 2 - second\n')
+ self.tap.write('ok 3 - third\n')
+ self.tap.write('not ok 4 - fourth\n')
+ self.tap.write('1..4\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 - first test in a script with trailing plan',
+ 'success test 1 - first test in a script with trailing plan',
+ 'test test 2 - second',
+ 'failure test 2 - second',
+ 'test test 3 - third',
+ 'success test 3 - third',
+ 'test test 4 - fourth',
+ 'failure test 4 - fourth'
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_four_tests_in_a_row_with_plan(self):
+ # A file
+ # 1..4
+ # ok 1 - first test in a script with no plan at all
+ # not ok 2 - second
+ # ok 3 - third
+ # not ok 4 - fourth
+ # results in four tests numbered and named
+ self.tap.write('1..4\n')
+ self.tap.write('ok 1 - first test in a script with a plan\n')
+ self.tap.write('not ok 2 - second\n')
+ self.tap.write('ok 3 - third\n')
+ self.tap.write('not ok 4 - fourth\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 - first test in a script with a plan',
+ 'success test 1 - first test in a script with a plan',
+ 'test test 2 - second',
+ 'failure test 2 - second',
+ 'test test 3 - third',
+ 'success test 3 - third',
+ 'test test 4 - fourth',
+ 'failure test 4 - fourth'
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_four_tests_in_a_row_no_plan(self):
+ # A file
+ # ok 1 - first test in a script with no plan at all
+ # not ok 2 - second
+ # ok 3 - third
+ # not ok 4 - fourth
+ # results in four tests numbered and named
+ self.tap.write('ok 1 - first test in a script with no plan at all\n')
+ self.tap.write('not ok 2 - second\n')
+ self.tap.write('ok 3 - third\n')
+ self.tap.write('not ok 4 - fourth\n')
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 - first test in a script with no plan at all',
+ 'success test 1 - first test in a script with no plan at all',
+ 'test test 2 - second',
+ 'failure test 2 - second',
+ 'test test 3 - third',
+ 'success test 3 - third',
+ 'test test 4 - fourth',
+ 'failure test 4 - fourth'
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_todo_and_skip(self):
+ # A file
+ # not ok 1 - a fail but # TODO but is TODO
+ # not ok 2 - another fail # SKIP instead
+ # results in two tests, numbered and commented.
+ self.tap.write("not ok 1 - a fail but # TODO but is TODO\n")
+ self.tap.write("not ok 2 - another fail # SKIP instead\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1 - a fail but',
+ 'xfail test 1 - a fail but [',
+ 'but is TODO',
+ ']',
+ 'test test 2 - another fail',
+ 'skip test 2 - another fail [',
+ 'instead',
+ ']',
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_leading_comments_add_to_next_test_log(self):
+ # A file
+ # # comment
+ # ok
+ # ok
+ # results in a single test with the comment included
+ # in the first test and not the second.
+ self.tap.write("# comment\n")
+ self.tap.write("ok\n")
+ self.tap.write("ok\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1',
+ 'success test 1 [',
+ '# comment',
+ ']',
+ 'test test 2',
+ 'success test 2',
+ ],
+ self.subunit.getvalue().splitlines())
+
+ def test_trailing_comments_are_included_in_last_test_log(self):
+ # A file
+ # ok foo
+ # ok foo
+ # # comment
+ # results in a two tests, with the second having the comment
+ # attached to its log.
+ self.tap.write("ok\n")
+ self.tap.write("ok\n")
+ self.tap.write("# comment\n")
+ self.tap.seek(0)
+ result = subunit.TAP2SubUnit(self.tap, self.subunit)
+ self.assertEqual(0, result)
+ self.assertEqual([
+ 'test test 1',
+ 'success test 1',
+ 'test test 2',
+ 'success test 2 [',
+ '# comment',
+ ']',
+ ],
+ self.subunit.getvalue().splitlines())
+
+
+def test_suite():
+ loader = subunit.tests.TestUtil.TestLoader()
+ result = loader.loadTestsFromName(__name__)
+ return result
diff --git a/python/subunit/tests/test_test_protocol.py b/python/subunit/tests/test_test_protocol.py
index 52b33c8..1f9e3a9 100644
--- a/python/subunit/tests/test_test_protocol.py
+++ b/python/subunit/tests/test_test_protocol.py
@@ -1,5 +1,5 @@
#
-# subunit: extensions to python unittest to get test results from subprocesses.
+# subunit: extensions to Python unittest to get test results from subprocesses.
# Copyright (C) 2005 Robert Collins <robertc@robertcollins.net>
#
# This program is free software; you can redistribute it and/or modify
@@ -22,6 +22,7 @@ from StringIO import StringIO
import os
import subunit
import sys
+import time
try:
class MockTestProtocolServerClient(object):
@@ -396,6 +397,42 @@ class TestTestProtocolServerLostConnection(unittest.TestCase):
self.assertEqual(self.client.failure_calls, [])
self.assertEqual(self.client.success_calls, [self.test])
+ def test_lost_connection_during_skip(self):
+ self.protocol.lineReceived("test old mcdonald\n")
+ self.protocol.lineReceived("skip old mcdonald [\n")
+ self.protocol.lostConnection()
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [
+ (self.test, subunit.RemoteError("lost connection during skip "
+ "report of test 'old mcdonald'"))])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [])
+
+ def test_lost_connection_during_xfail(self):
+ self.protocol.lineReceived("test old mcdonald\n")
+ self.protocol.lineReceived("xfail old mcdonald [\n")
+ self.protocol.lostConnection()
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [
+ (self.test, subunit.RemoteError("lost connection during xfail "
+ "report of test 'old mcdonald'"))])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [])
+
+ def test_lost_connection_during_success(self):
+ self.protocol.lineReceived("test old mcdonald\n")
+ self.protocol.lineReceived("success old mcdonald [\n")
+ self.protocol.lostConnection()
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [
+ (self.test, subunit.RemoteError("lost connection during success "
+ "report of test 'old mcdonald'"))])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [])
+
class TestTestProtocolServerAddError(unittest.TestCase):
@@ -493,6 +530,118 @@ class TestTestProtocolServerAddFailure(unittest.TestCase):
self.failure_quoted_bracket("failure:")
+class TestTestProtocolServerAddxFail(unittest.TestCase):
+ """Tests for the xfail keyword.
+
+ In Python this thunks through to Success due to stdlib limitations (see
+ README).
+ """
+
+ def setUp(self):
+ """Setup a test object ready to be xfailed."""
+ self.client = MockTestProtocolServerClient()
+ self.protocol = subunit.TestProtocolServer(self.client)
+ self.protocol.lineReceived("test mcdonalds farm\n")
+ self.test = self.client.start_calls[-1]
+
+ def simple_xfail_keyword(self, keyword):
+ self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def test_simple_xfail(self):
+ self.simple_xfail_keyword("xfail")
+
+ def test_simple_xfail_colon(self):
+ self.simple_xfail_keyword("xfail:")
+
+ def test_xfail_empty_message(self):
+ self.protocol.lineReceived("xfail mcdonalds farm [\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def xfail_quoted_bracket(self, keyword):
+ # This tests it is accepted, but cannot test it is used today, because
+ # of not having a way to expose it in Python so far.
+ self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
+ self.protocol.lineReceived(" ]\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def test_xfail_quoted_bracket(self):
+ self.xfail_quoted_bracket("xfail")
+
+ def test_xfail_colon_quoted_bracket(self):
+ self.xfail_quoted_bracket("xfail:")
+
+
+class TestTestProtocolServerAddSkip(unittest.TestCase):
+ """Tests for the skip keyword.
+
+ In Python this thunks through to Success due to stdlib limitations. (See
+ README).
+ """
+
+ def setUp(self):
+ """Setup a test object ready to be skipped."""
+ self.client = MockTestProtocolServerClient()
+ self.protocol = subunit.TestProtocolServer(self.client)
+ self.protocol.lineReceived("test mcdonalds farm\n")
+ self.test = self.client.start_calls[-1]
+
+ def simple_skip_keyword(self, keyword):
+ self.protocol.lineReceived("%s mcdonalds farm\n" % keyword)
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def test_simple_skip(self):
+ self.simple_skip_keyword("skip")
+
+ def test_simple_skip_colon(self):
+ self.simple_skip_keyword("skip:")
+
+ def test_skip_empty_message(self):
+ self.protocol.lineReceived("skip mcdonalds farm [\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def skip_quoted_bracket(self, keyword):
+ # This tests it is accepted, but cannot test it is used today, because
+ # of not having a way to expose it in Python so far.
+ self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
+ self.protocol.lineReceived(" ]\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def test_skip_quoted_bracket(self):
+ self.skip_quoted_bracket("skip")
+
+ def test_skip_colon_quoted_bracket(self):
+ self.skip_quoted_bracket("skip:")
+
+
class TestTestProtocolServerAddSuccess(unittest.TestCase):
def setUp(self):
@@ -520,6 +669,33 @@ class TestTestProtocolServerAddSuccess(unittest.TestCase):
def test_simple_success_colon(self):
self.simple_success_keyword("successful:")
+ def test_success_empty_message(self):
+ self.protocol.lineReceived("success mcdonalds farm [\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def success_quoted_bracket(self, keyword):
+ # This tests it is accepted, but cannot test it is used today, because
+ # of not having a way to expose it in Python so far.
+ self.protocol.lineReceived("%s mcdonalds farm [\n" % keyword)
+ self.protocol.lineReceived(" ]\n")
+ self.protocol.lineReceived("]\n")
+ self.assertEqual(self.client.start_calls, [self.test])
+ self.assertEqual(self.client.end_calls, [self.test])
+ self.assertEqual(self.client.error_calls, [])
+ self.assertEqual(self.client.failure_calls, [])
+ self.assertEqual(self.client.success_calls, [self.test])
+
+ def test_success_quoted_bracket(self):
+ self.success_quoted_bracket("success")
+
+ def test_success_colon_quoted_bracket(self):
+ self.success_quoted_bracket("success:")
+
class TestTestProtocolServerStreamTags(unittest.TestCase):
"""Test managing tags on the protocol level."""
@@ -574,6 +750,20 @@ class TestTestProtocolServerStreamTags(unittest.TestCase):
self.assertEqual(set(["foo", "bar"]), self.protocol.tags)
+class TestTestProtocolServerStreamTime(unittest.TestCase):
+ """Test managing time information at the protocol level."""
+
+ def setUp(self):
+ self.client = MockTestProtocolServerClient()
+ self.stream = StringIO()
+ self.protocol = subunit.TestProtocolServer(self.client,
+ stream=self.stream)
+
+ def test_time_accepted(self):
+ self.protocol.lineReceived("time: 2001-12-12 12:59:59Z\n")
+ self.assertEqual("", self.stream.getvalue())
+
+
class TestRemotedTestCase(unittest.TestCase):
def test_simple(self):