summaryrefslogtreecommitdiff
path: root/python/subunit/__init__.py
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2011-04-25 16:25:24 +1200
committerRobert Collins <robertc@robertcollins.net>2011-04-25 16:25:24 +1200
commit3b6f97b3c5ac79aefc74047b9292c4acee7ff09d (patch)
tree10dbc28291a3d86f66b77bb5aaa58dc79899f162 /python/subunit/__init__.py
parent63d596a30103017c7cac6af37d46c91a9cf7c325 (diff)
downloadsubunit-3b6f97b3c5ac79aefc74047b9292c4acee7ff09d.tar.gz
Nearly done.
Diffstat (limited to 'python/subunit/__init__.py')
-rw-r--r--python/subunit/__init__.py72
1 files changed, 43 insertions, 29 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index 6e1668e..8fb3ab7 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -129,9 +129,11 @@ try:
RemoteException = _StringException
# For testing: different pythons have different str() implementations.
if sys.version_info > (3, 0):
- _remote_exception_str = 'testtools.testresult.real._StringException'
+ _remote_exception_str = "testtools.testresult.real._StringException"
+ _remote_exception_str_chunked = "34\r\n" + _remote_exception_str
else:
- _remote_exception_str = '_StringException'
+ _remote_exception_str = "_StringException"
+ _remote_exception_str_chunked = "1A\r\n" + _remote_exception_str
except ImportError:
raise ImportError ("testtools.testresult.real does not contain "
"_StringException, check your version.")
@@ -559,7 +561,8 @@ class TestProtocolClient(testresult.TestResult):
# Get a TestSuite or TestCase to run
suite = make_suite()
- # Create a stream (any object with a 'write' method)
+ # Create a stream (any object with a 'write' method). This should accept
+ # bytes not strings: subunit is a byte orientated protocol.
stream = file('tests.log', 'wb')
# Create a subunit result object which will output to the stream
result = subunit.TestProtocolClient(stream)
@@ -576,6 +579,14 @@ class TestProtocolClient(testresult.TestResult):
testresult.TestResult.__init__(self)
self._stream = stream
_make_stream_binary(stream)
+ self._progress_fmt = _b("progress: ")
+ self._bytes_eol = _b("\n")
+ self._progress_plus = _b("+")
+ self._progress_push = _b("push")
+ self._progress_pop = _b("pop")
+ self._empty_bytes = _b("")
+ self._start_simple = _b(" [\n")
+ self._end_simple = _b("]\n")
def addError(self, test, error=None, details=None):
"""Report an error in test test.
@@ -637,42 +648,42 @@ class TestProtocolClient(testresult.TestResult):
:param details: New Testing-in-python drafted API; a dict from string
to subunit.Content objects.
"""
- self._stream.write("%s: %s" % (outcome, test.id()))
+ self._stream.write(_b("%s: %s" % (outcome, test.id())))
if error is None and details is None:
raise ValueError
if error is not None:
- self._stream.write(" [\n")
+ self._stream.write(self._start_simple)
# XXX: this needs to be made much stricter, along the lines of
# Martin[gz]'s work in testtools. Perhaps subunit can use that?
for line in self._exc_info_to_unicode(error, test).splitlines():
self._stream.write(("%s\n" % line).encode('utf8'))
else:
self._write_details(details)
- self._stream.write("]\n")
+ self._stream.write(self._end_simple)
def addSkip(self, test, reason=None, details=None):
"""Report a skipped test."""
if reason is None:
self._addOutcome("skip", test, error=None, details=details)
else:
- self._stream.write("skip: %s [\n" % test.id())
- self._stream.write("%s\n" % reason)
- self._stream.write("]\n")
+ self._stream.write(_b("skip: %s [\n" % test.id()))
+ self._stream.write(_b("%s\n" % reason))
+ self._stream.write(self._end_simple)
def addSuccess(self, test, details=None):
"""Report a success in a test."""
- self._stream.write("successful: %s" % test.id())
+ self._stream.write(_b("successful: %s" % test.id()))
if not details:
- self._stream.write("\n")
+ self._stream.write(_b("\n"))
else:
self._write_details(details)
- self._stream.write("]\n")
+ self._stream.write(self._end_simple)
addUnexpectedSuccess = addSuccess
def startTest(self, test):
"""Mark a test as starting its test run."""
super(TestProtocolClient, self).startTest(test)
- self._stream.write("test: %s\n" % test.id())
+ self._stream.write(_b("test: %s\n" % test.id()))
self._stream.flush()
def stopTest(self, test):
@@ -690,16 +701,19 @@ class TestProtocolClient(testresult.TestResult):
PROGRESS_POP.
"""
if whence == PROGRESS_CUR and offset > -1:
- prefix = "+"
+ prefix = self._progress_plus
+ offset = _b(str(offset))
elif whence == PROGRESS_PUSH:
- prefix = ""
- offset = "push"
+ prefix = self._empty_bytes
+ offset = self._progress_push
elif whence == PROGRESS_POP:
- prefix = ""
- offset = "pop"
+ prefix = self._empty_bytes
+ offset = self._progress_pop
else:
- prefix = ""
- self._stream.write("progress: %s%s\n" % (prefix, offset))
+ prefix = self._empty_bytes
+ offset = _b(str(offset))
+ self._stream.write(self._progress_fmt + prefix + offset +
+ self._bytes_eol)
def time(self, a_datetime):
"""Inform the client of the time.
@@ -707,29 +721,29 @@ class TestProtocolClient(testresult.TestResult):
":param datetime: A datetime.datetime object.
"""
time = a_datetime.astimezone(iso8601.Utc())
- self._stream.write("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
+ self._stream.write(_b("time: %04d-%02d-%02d %02d:%02d:%02d.%06dZ\n" % (
time.year, time.month, time.day, time.hour, time.minute,
- time.second, time.microsecond))
+ time.second, time.microsecond)))
def _write_details(self, details):
"""Output details to the stream.
:param details: An extended details dict for a test outcome.
"""
- self._stream.write(" [ multipart\n")
+ self._stream.write(_b(" [ multipart\n"))
for name, content in sorted(details.items()):
- self._stream.write("Content-Type: %s/%s" %
- (content.content_type.type, content.content_type.subtype))
+ self._stream.write(_b("Content-Type: %s/%s" %
+ (content.content_type.type, content.content_type.subtype)))
parameters = content.content_type.parameters
if parameters:
- self._stream.write(";")
+ self._stream.write(_b(";"))
param_strs = []
for param, value in parameters.items():
param_strs.append("%s=%s" % (param, value))
- self._stream.write(",".join(param_strs))
- self._stream.write("\n%s\n" % name)
+ self._stream.write(_b(",".join(param_strs)))
+ self._stream.write(_b("\n%s\n" % name))
encoder = chunked.Encoder(self._stream)
- map(encoder.write, content.iter_bytes())
+ list(map(encoder.write, content.iter_bytes()))
encoder.close()
def done(self):