diff options
| author | Robert Collins <robertc@robertcollins.net> | 2013-02-22 18:09:06 +1300 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2013-02-22 18:09:06 +1300 |
| commit | b2def216dbc1124a19ef63b1cfa6fce32c54e660 (patch) | |
| tree | d04d5bbe6c64ca498f71bd0d636feda1de94d9a3 /python | |
| parent | 2a943c86b1eec4c71dcdb567789e81dee15e6df9 (diff) | |
| download | subunit-git-b2def216dbc1124a19ef63b1cfa6fce32c54e660.tar.gz | |
Start on an encoder.
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/__init__.py | 1 | ||||
| -rw-r--r-- | python/subunit/tests/__init__.py | 2 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_protocol2.py | 47 | ||||
| -rw-r--r-- | python/subunit/v2.py | 112 |
4 files changed, 162 insertions, 0 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index 77f4a82..1a0b956 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -146,6 +146,7 @@ except ImportError: from testtools import testresult from subunit import chunked, details, iso8601, test_results +from subunit.v2 import StreamResultToBytes # same format as sys.version_info: "A tuple containing the five components of # the version number: major, minor, micro, releaselevel, and serial. All diff --git a/python/subunit/tests/__init__.py b/python/subunit/tests/__init__.py index e0e1eb1..4275229 100644 --- a/python/subunit/tests/__init__.py +++ b/python/subunit/tests/__init__.py @@ -25,6 +25,7 @@ from subunit.tests import ( test_subunit_tags, test_tap2subunit, test_test_protocol, + test_test_protocol2, test_test_results, ) @@ -35,6 +36,7 @@ def test_suite(): result.addTest(test_progress_model.test_suite()) result.addTest(test_test_results.test_suite()) result.addTest(test_test_protocol.test_suite()) + result.addTest(test_test_protocol2.test_suite()) result.addTest(test_tap2subunit.test_suite()) result.addTest(test_subunit_filter.test_suite()) result.addTest(test_subunit_tags.test_suite()) diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py new file mode 100644 index 0000000..9409ba2 --- /dev/null +++ b/python/subunit/tests/test_test_protocol2.py @@ -0,0 +1,47 @@ +# +# subunit: extensions to Python unittest to get test results from subprocesses. +# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net> +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +from io import BytesIO + +from testtools import TestCase +from testtools.tests.test_testresult import TestStreamResultContract + +import subunit + +class TestStreamResultToBytesContract(TestCase, TestStreamResultContract): + """Check that StreamResult behaves as testtools expects.""" + + def _make_result(self): + return subunit.StreamResultToBytes(BytesIO()) + + +class TestStreamResultToBytes(TestCase): + + def _make_result(self): + output = BytesIO() + return subunit.StreamResultToBytes(output), output + + def test_trivial_enumeration(self): + result, output = self._make_result() + result.status("foo", 'exists') + self.assertEqual(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f', + output.getvalue()) + + +def test_suite(): + loader = subunit.tests.TestUtil.TestLoader() + result = loader.loadTestsFromName(__name__) + return result diff --git a/python/subunit/v2.py b/python/subunit/v2.py new file mode 100644 index 0000000..78d36b6 --- /dev/null +++ b/python/subunit/v2.py @@ -0,0 +1,112 @@ +# +# subunit: extensions to Python unittest to get test results from subprocesses. +# Copyright (C) 2013 Robert Collins <robertc@robertcollins.net> +# +# Licensed under either the Apache License, Version 2.0 or the BSD 3-clause +# license at the users choice. A copy of both licenses are available in the +# project source as Apache-2.0 and BSD. You may not use this file except in +# compliance with one of these two licences. +# +# Unless required by applicable law or agreed to in writing, software +# distributed under these licenses is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# license you chose for the specific language governing permissions and +# limitations under that license. +# + +import struct +import zlib + +__all__ = [ + 'StreamResultToBytes', + ] + + +class StreamResultToBytes(object): + """Convert StreamResult API calls to bytes. + + The StreamResult API is defined by testtools.StreamResult. + """ + + status_mask = { + None: 0, + 'exists': 0x1, + 'inprogress': 0x2, + 'success': 0x3, + 'uxsuccess': 0x4, + 'skip': 0x5, + 'fail': 0x6, + 'xfail': 0x7, + } + + fmt_16 = '>H' + fmt_32 = '>I' + zero_b = b'\0'[0] + + def __init__(self, output_stream): + """Create a StreamResultToBytes with output written to output_stream. + + :param output_stream: A file-like object. Must support write(bytes) + and flush() methods. Flush will be called after each write. + """ + self.output_stream = output_stream + + def startTestRun(self): + pass + + def stopTestRun(self): + pass + + def file(stream, file_name, file_bytes, eof=False, mime_type=None, + test_id=None, route_code=None, timestamp=None): + pass + + def status(self, test_id, test_status, test_tags=None, runnable=True, + route_code=None, timestamp=None): + self._write_packet(test_id=test_id, test_status=test_status, + test_tags=test_tags, runnable=runnable, route_code=route_code, + timestamp=timestamp) + + def _write_utf8(self, a_string, packet): + utf8 = a_string.encode('utf-8') + assert len(utf8) < 65536 + packet.append(struct.pack(self.fmt_16, len(utf8))) + packet.append(utf8) + + def _write_packet(self, test_id=None, test_status=None, test_tags=None, + runnable=True, route_code=None, timestamp=None): + packet = [b'\xb3'] + packet.append(b'FF') # placeholder for flags + packet.append(b'FFF') # placeholder for length + flags = 0x2000 # Version 0x2 + if test_id is not None: + flags = flags | 0x0800 + self._write_utf8(test_id, packet) + if route_code is not None: + flags = flags | 0x0400 + if timestamp is not None: + flags = flags | 0x0200 + if runnable: + flags = flags | 0x0100 + if test_tags: + flags = flags | 0x0080 + #if file_content: + # flags = flags | 0x0040 + #if mime_type: + # flags = flags | 0x0020 + # if eof: + # flags = flags | 0x0010 + # 0x0008 - not used in v2. + flags = flags | self.status_mask[test_status] + packet[1] = struct.pack(self.fmt_16, flags) + length = struct.pack(self.fmt_32, sum(map(len, packet)) + 4) + assert length[0] == self.zero_b + packet[2] = length[1:] + # We could either do a partial application of crc32 over each chunk + # or a single join to a temp variable then a final join + # or two writes (that python might then split). + # For now, simplest code: join, crc32, join, output + content = b''.join(packet) + self.output_stream.write(content + struct.pack( + self.fmt_32, zlib.crc32(content) & 0xffffffff)) + self.output_stream.flush() |
