summaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorRobert Collins <robertc@robertcollins.net>2013-02-22 18:09:06 +1300
committerRobert Collins <robertc@robertcollins.net>2013-02-22 18:09:06 +1300
commitb2def216dbc1124a19ef63b1cfa6fce32c54e660 (patch)
treed04d5bbe6c64ca498f71bd0d636feda1de94d9a3 /python
parent2a943c86b1eec4c71dcdb567789e81dee15e6df9 (diff)
downloadsubunit-git-b2def216dbc1124a19ef63b1cfa6fce32c54e660.tar.gz
Start on an encoder.
Diffstat (limited to 'python')
-rw-r--r--python/subunit/__init__.py1
-rw-r--r--python/subunit/tests/__init__.py2
-rw-r--r--python/subunit/tests/test_test_protocol2.py47
-rw-r--r--python/subunit/v2.py112
4 files changed, 162 insertions, 0 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py
index 77f4a82..1a0b956 100644
--- a/python/subunit/__init__.py
+++ b/python/subunit/__init__.py
@@ -146,6 +146,7 @@ except ImportError:
from testtools import testresult
from subunit import chunked, details, iso8601, test_results
+from subunit.v2 import StreamResultToBytes
# same format as sys.version_info: "A tuple containing the five components of
# the version number: major, minor, micro, releaselevel, and serial. All
diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py
index e0e1eb1..4275229 100644
--- a/python/subunit/tests/__init__.py
+++ b/python/subunit/tests/__init__.py
@@ -25,6 +25,7 @@ from subunit.tests import (
test_subunit_tags,
test_tap2subunit,
test_test_protocol,
+ test_test_protocol2,
test_test_results,
)
@@ -35,6 +36,7 @@ def test_suite():
result.addTest(test_progress_model.test_suite())
result.addTest(test_test_results.test_suite())
result.addTest(test_test_protocol.test_suite())
+ result.addTest(test_test_protocol2.test_suite())
result.addTest(test_tap2subunit.test_suite())
result.addTest(test_subunit_filter.test_suite())
result.addTest(test_subunit_tags.test_suite())
diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py
new file mode 100644
index 0000000..9409ba2
--- /dev/null
+++ b/python/subunit/tests/test_test_protocol2.py
@@ -0,0 +1,47 @@
+#
+# subunit: extensions to Python unittest to get test results from subprocesses.
+# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net>
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+from io import BytesIO
+
+from testtools import TestCase
+from testtools.tests.test_testresult import TestStreamResultContract
+
+import subunit
+
+class TestStreamResultToBytesContract(TestCase, TestStreamResultContract):
+ """Check that StreamResult behaves as testtools expects."""
+
+ def _make_result(self):
+ return subunit.StreamResultToBytes(BytesIO())
+
+
+class TestStreamResultToBytes(TestCase):
+
+ def _make_result(self):
+ output = BytesIO()
+ return subunit.StreamResultToBytes(output), output
+
+ def test_trivial_enumeration(self):
+ result, output = self._make_result()
+ result.status("foo", 'exists')
+ self.assertEqual(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f',
+ output.getvalue())
+
+
+def test_suite():
+ loader = subunit.tests.TestUtil.TestLoader()
+ result = loader.loadTestsFromName(__name__)
+ return result
diff --git a/python/subunit/v2.py b/python/subunit/v2.py
new file mode 100644
index 0000000..78d36b6
--- /dev/null
+++ b/python/subunit/v2.py
@@ -0,0 +1,112 @@
+#
+# subunit: extensions to Python unittest to get test results from subprocesses.
+# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net>
+#
+# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause
+# license at the users choice. A copy of both licenses are available in the
+# project source as Apache-2.0 and BSD. You may not use this file except in
+# compliance with one of these two licences.
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# license you chose for the specific language governing permissions and
+# limitations under that license.
+#
+
+import struct
+import zlib
+
+__all__ = [
+ 'StreamResultToBytes',
+ ]
+
+
+class StreamResultToBytes(object):
+ """Convert StreamResult API calls to bytes.
+
+ The StreamResult API is defined by testtools.StreamResult.
+ """
+
+ status_mask = {
+ None: 0,
+ 'exists': 0x1,
+ 'inprogress': 0x2,
+ 'success': 0x3,
+ 'uxsuccess': 0x4,
+ 'skip': 0x5,
+ 'fail': 0x6,
+ 'xfail': 0x7,
+ }
+
+ fmt_16 = '>H'
+ fmt_32 = '>I'
+ zero_b = b'\0'[0]
+
+ def __init__(self, output_stream):
+ """Create a StreamResultToBytes with output written to output_stream.
+
+ :param output_stream: A file-like object. Must support write(bytes)
+ and flush() methods. Flush will be called after each write.
+ """
+ self.output_stream = output_stream
+
+ def startTestRun(self):
+ pass
+
+ def stopTestRun(self):
+ pass
+
+ def file(stream, file_name, file_bytes, eof=False, mime_type=None,
+ test_id=None, route_code=None, timestamp=None):
+ pass
+
+ def status(self, test_id, test_status, test_tags=None, runnable=True,
+ route_code=None, timestamp=None):
+ self._write_packet(test_id=test_id, test_status=test_status,
+ test_tags=test_tags, runnable=runnable, route_code=route_code,
+ timestamp=timestamp)
+
+ def _write_utf8(self, a_string, packet):
+ utf8 = a_string.encode('utf-8')
+ assert len(utf8) < 65536
+ packet.append(struct.pack(self.fmt_16, len(utf8)))
+ packet.append(utf8)
+
+ def _write_packet(self, test_id=None, test_status=None, test_tags=None,
+ runnable=True, route_code=None, timestamp=None):
+ packet = [b'\xb3']
+ packet.append(b'FF') # placeholder for flags
+ packet.append(b'FFF') # placeholder for length
+ flags = 0x2000 # Version 0x2
+ if test_id is not None:
+ flags = flags | 0x0800
+ self._write_utf8(test_id, packet)
+ if route_code is not None:
+ flags = flags | 0x0400
+ if timestamp is not None:
+ flags = flags | 0x0200
+ if runnable:
+ flags = flags | 0x0100
+ if test_tags:
+ flags = flags | 0x0080
+ #if file_content:
+ # flags = flags | 0x0040
+ #if mime_type:
+ # flags = flags | 0x0020
+ # if eof:
+ # flags = flags | 0x0010
+ # 0x0008 - not used in v2.
+ flags = flags | self.status_mask[test_status]
+ packet[1] = struct.pack(self.fmt_16, flags)
+ length = struct.pack(self.fmt_32, sum(map(len, packet)) + 4)
+ assert length[0] == self.zero_b
+ packet[2] = length[1:]
+ # We could either do a partial application of crc32 over each chunk
+ # or a single join to a temp variable then a final join
+ # or two writes (that python might then split).
+ # For now, simplest code: join, crc32, join, output
+ content = b''.join(packet)
+ self.output_stream.write(content + struct.pack(
+ self.fmt_32, zlib.crc32(content) & 0xffffffff))
+ self.output_stream.flush()