diff options
Diffstat (limited to 'python')
| -rw-r--r-- | python/subunit/__init__.py | 2 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_protocol2.py | 273 | ||||
| -rw-r--r-- | python/subunit/v2.py | 250 |
3 files changed, 493 insertions, 32 deletions
diff --git a/python/subunit/__init__.py b/python/subunit/__init__.py index 1a0b956..ced5b4a 100644 --- a/python/subunit/__init__.py +++ b/python/subunit/__init__.py @@ -146,7 +146,7 @@ except ImportError: from testtools import testresult from subunit import chunked, details, iso8601, test_results -from subunit.v2 import StreamResultToBytes +from subunit.v2 import ByteStreamToStreamResult, StreamResultToBytes # same format as sys.version_info: "A tuple containing the five components of # the version number: major, minor, micro, releaselevel, and serial. All diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py index 9409ba2..834f2f9 100644 --- a/python/subunit/tests/test_test_protocol2.py +++ b/python/subunit/tests/test_test_protocol2.py @@ -15,11 +15,14 @@ # from io import BytesIO +import datetime from testtools import TestCase from testtools.tests.test_testresult import TestStreamResultContract +from testtools.testresult.doubles import StreamResult import subunit +import subunit.iso8601 as iso8601 class TestStreamResultToBytesContract(TestCase, TestStreamResultContract): """Check that StreamResult behaves as testtools expects.""" @@ -40,6 +43,276 @@ class TestStreamResultToBytes(TestCase): self.assertEqual(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f', output.getvalue()) + def test_inprogress(self): + result, output = self._make_result() + result.status("foo", 'inprogress') + self.assertEqual(b'\xb3\x29\x02\0\0\x0f\0\x03foo\xa0\x81\x08\xfa', + output.getvalue()) + + def test_success(self): + result, output = self._make_result() + result.status("foo", 'success') + self.assertEqual(b'\xb3\x29\x03\0\0\x0f\0\x03foo\xb7\xfa\x1c\xb9', + output.getvalue()) + + def test_uxsuccess(self): + result, output = self._make_result() + result.status("foo", 'uxsuccess') + self.assertEqual(b'\xb3\x29\x04\0\0\x0f\0\x03foo\xd3\x9bqp', + output.getvalue()) + + def test_skip(self): + result, output = self._make_result() + result.status("foo", 'skip') + self.assertEqual(b'\xb3\x29\x05\0\0\x0f\0\x03foo\xc4\xe0e3', + output.getvalue()) + + def test_fail(self): + result, output = self._make_result() + result.status("foo", 'fail') + self.assertEqual(b'\xb3\x29\x06\0\0\x0f\0\x03foo\xfdmY\xf6', + output.getvalue()) + + def test_xfail(self): + result, output = self._make_result() + result.status("foo", 'xfail') + self.assertEqual(b'\xb3\x29\x07\0\0\x0f\0\x03foo\xea\x16M\xb5', + output.getvalue()) + + def test_unknown_status(self): + result, output = self._make_result() + self.assertRaises(Exception, result.status, "foo", 'boo') + self.assertEqual(b'', output.getvalue()) + + def test_eof(self): + result, output = self._make_result() + result.status(eof=True) + self.assertEqual( + b'\xb3!\x10\x00\x00\na\xf1xM', + output.getvalue()) + + def test_file_content(self): + result, output = self._make_result() + result.status(file_name="barney", file_bytes=b"woo") + self.assertEqual( + b'\xb3!@\x00\x00\x15\x00\x06barneywoo\xfd\xecu\x1c', + output.getvalue()) + + def test_mime(self): + result, output = self._make_result() + result.status(mime_type="application/foo; charset=1") + self.assertEqual( + b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xf9', + output.getvalue()) + + def test_route_code(self): + result, output = self._make_result() + result.status(test_id="bar", test_status='success', + route_code="source") + self.assertEqual(b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x03bar\xad\xbd\x8c$', + output.getvalue()) + + def test_runnable(self): + result, output = self._make_result() + result.status("foo", 'success', runnable=False) + self.assertEqual(b'\xb3(\x03\x00\x00\x0f\x00\x03fooX8w\x87', + output.getvalue()) + + def test_tags(self): + result, output = self._make_result() + result.status(test_id="bar", test_tags=set(['foo', 'bar'])) + self.assertEqual(b'\xb3)\x80\x00\x00\x1b\x00\x03bar\x00\x02\x00\x03foo\x00\x03bar\xabMw\xe6', + output.getvalue()) + + def test_timestamp(self): + timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45, + iso8601.Utc()) + result, output = self._make_result() + result.status(test_id="bar", test_status='success', timestamp=timestamp) + self.assertEqual(b'\xb3+\x03\x00\x00\x17<\x17T\xcf\x00\x00\xaf\xc8\x00\x03barU>\xb2\xdb', + output.getvalue()) + + +class TestByteStreamToStreamResult(TestCase): + + def test_non_subunit_encapsulated(self): + source = BytesIO(b"foo\nbar\n") + result = StreamResult() + subunit.ByteStreamToStreamResult( + source, non_subunit_name="stdout").run(result) + self.assertEqual([ + ('status', None, None, None, True, 'stdout', b'f', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'o', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'o', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'b', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'a', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'r', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'\n', False, None, None, None), + ], result._events) + self.assertEqual(b'', source.read()) + + def test_non_subunit_disabled_raises(self): + source = BytesIO(b"foo\nbar\n") + result = StreamResult() + case = subunit.ByteStreamToStreamResult(source) + e = self.assertRaises(Exception, case.run, result) + self.assertEqual(b'f', e.args[1]) + self.assertEqual(b'oo\nbar\n', source.read()) + self.assertEqual([], result._events) + + def test_trivial_enumeration(self): + source = BytesIO(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f') + result = StreamResult() + subunit.ByteStreamToStreamResult( + source, non_subunit_name="stdout").run(result) + self.assertEqual(b'', source.read()) + self.assertEqual([ + ('status', 'foo', 'exists', None, True, None, None, False, None, None, None), + ], result._events) + + def test_multiple_events(self): + source = BytesIO(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f' + b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f') + result = StreamResult() + subunit.ByteStreamToStreamResult( + source, non_subunit_name="stdout").run(result) + self.assertEqual(b'', source.read()) + self.assertEqual([ + ('status', 'foo', 'exists', None, True, None, None, False, None, None, None), + ('status', 'foo', 'exists', None, True, None, None, False, None, None, None), + ], result._events) + + def test_inprogress(self): + self.check_event( + b'\xb3\x29\x02\0\0\x0f\0\x03foo\xa0\x81\x08\xfa', 'inprogress') + + def test_success(self): + self.check_event( + b'\xb3\x29\x03\0\0\x0f\0\x03foo\xb7\xfa\x1c\xb9', 'success') + + def test_uxsuccess(self): + self.check_event( + b'\xb3\x29\x04\0\0\x0f\0\x03foo\xd3\x9bqp', 'uxsuccess') + + def test_skip(self): + self.check_event( + b'\xb3\x29\x05\0\0\x0f\0\x03foo\xc4\xe0e3', 'skip') + + def test_fail(self): + self.check_event( + b'\xb3\x29\x06\0\0\x0f\0\x03foo\xfdmY\xf6', 'fail') + + def test_xfail(self): + self.check_event( + b'\xb3\x29\x07\0\0\x0f\0\x03foo\xea\x16M\xb5', 'xfail') + + def check_events(self, source_bytes, events): + source = BytesIO(source_bytes) + result = StreamResult() + subunit.ByteStreamToStreamResult( + source, non_subunit_name="stdout").run(result) + self.assertEqual(b'', source.read()) + self.assertEqual(events, result._events) + + def check_event(self, source_bytes, test_status=None, test_id="foo", + route_code=None, timestamp=None, tags=None, mime_type=None, + file_name=None, file_bytes=None, eof=False, runnable=True): + event = self._event(test_id=test_id, test_status=test_status, + tags=tags, runnable=runnable, file_name=file_name, + file_bytes=file_bytes, eof=eof, mime_type=mime_type, + route_code=route_code, timestamp=timestamp) + self.check_events(source_bytes, [event]) + + def _event(self, test_status=None, test_id=None, route_code=None, + timestamp=None, tags=None, mime_type=None, file_name=None, + file_bytes=None, eof=False, runnable=True): + return ('status', test_id, test_status, tags, runnable, file_name, + file_bytes, eof, mime_type, route_code, timestamp) + + def test_eof(self): + self.check_event( + b'\xb3!\x10\x00\x00\na\xf1xM', + test_id=None, eof=True) + + def test_file_content(self): + self.check_event( + b'\xb3!@\x00\x00\x15\x00\x06barneywoo\xfd\xecu\x1c', + test_id=None, file_name="barney", file_bytes=b"woo") + + def test_mime(self): + self.check_event( + b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xf9', + test_id=None, mime_type='application/foo; charset=1') + + def test_route_code(self): + self.check_event( + b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x03bar\xad\xbd\x8c$', + 'success', route_code="source", test_id="bar") + + def test_runnable(self): + self.check_event( + b'\xb3(\x03\x00\x00\x0f\x00\x03fooX8w\x87', + test_status='success', runnable=False) + + def test_tags(self): + self.check_event( + b'\xb3)\x80\x00\x00\x1b\x00\x03bar\x00\x02\x00\x03foo\x00\x03bar\xabMw\xe6', + None, tags=set(['foo', 'bar']), test_id="bar") + + def test_timestamp(self): + timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45, + iso8601.Utc()) + self.check_event( + b'\xb3+\x03\x00\x00\x17<\x17T\xcf\x00\x00\xaf\xc8\x00\x03barU>\xb2\xdb', + 'success', test_id='bar', timestamp=timestamp) + + def test_bad_crc_errors_via_status(self): + file_bytes = \ + b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xee' + self.check_events( file_bytes, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=file_bytes), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b'Bad checksum - calculated (0x5d23f9f9), ' + b'stored (0x5d23f9ee)'), + ]) + + def test_not_utf8_in_string(self): + file_bytes = \ + b'\xb3-\x03\x00\x00\x17\x00\x06\xb4ource\x00\x03bar\x25\x2f\xb5\xd7' + self.check_events(file_bytes, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=file_bytes), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b'UTF8 string at offset 0 is not UTF8'), + ]) + + def test_NULL_in_string(self): + file_bytes = \ + b'\xb3-\x03\x00\x00\x17\x00\x06so\x00rce\x00\x03bar\x17\x89\x0a\xbe' + self.check_events(file_bytes, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=file_bytes), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b'UTF8 string at offset 0 contains NUL byte'), + ]) + + def test_bad_utf8_stringlength(self): + file_bytes = \ + b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x08bar\x7a\xbc\x0b\x25' + self.check_events(file_bytes, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=file_bytes), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b'UTF8 string at offset 8 extends past end of ' + b'packet: claimed 8 bytes, 7 available'), + ]) + def test_suite(): loader = subunit.tests.TestUtil.TestLoader() diff --git a/python/subunit/v2.py b/python/subunit/v2.py index 78d36b6..1245ae9 100644 --- a/python/subunit/v2.py +++ b/python/subunit/v2.py @@ -14,13 +14,35 @@ # limitations under that license. # +import datetime import struct import zlib +import subunit.iso8601 as iso8601 + __all__ = [ 'StreamResultToBytes', ] +SIGNATURE = b'\xb3' +FMT_16 = '>H' +FMT_32 = '>I' +FMT_TIMESTAMP = '>II' +FLAG_TEST_ID = 0x0800 +FLAG_ROUTE_CODE = 0x0400 +FLAG_TIMESTAMP = 0x0200 +FLAG_RUNNABLE = 0x0100 +FLAG_TAGS = 0x0080 +FLAG_MIME_TYPE = 0x0020 +FLAG_EOF = 0x0010 +FLAG_FILE_CONTENT = 0x0040 +EPOCH = datetime.datetime.utcfromtimestamp(0).replace(tzinfo=iso8601.Utc()) +NUL_ELEMENT = b'\0'[0] + + +class ParseError(Exception): + """Used to pass error messages within the parser.""" + class StreamResultToBytes(object): """Convert StreamResult API calls to bytes. @@ -39,8 +61,6 @@ class StreamResultToBytes(object): 'xfail': 0x7, } - fmt_16 = '>H' - fmt_32 = '>I' zero_b = b'\0'[0] def __init__(self, output_stream): @@ -57,49 +77,62 @@ class StreamResultToBytes(object): def stopTestRun(self): pass - def file(stream, file_name, file_bytes, eof=False, mime_type=None, - test_id=None, route_code=None, timestamp=None): - pass - - def status(self, test_id, test_status, test_tags=None, runnable=True, - route_code=None, timestamp=None): + def status(self, test_id=None, test_status=None, test_tags=None, + runnable=True, file_name=None, file_bytes=None, eof=False, + mime_type=None, route_code=None, timestamp=None): self._write_packet(test_id=test_id, test_status=test_status, - test_tags=test_tags, runnable=runnable, route_code=route_code, - timestamp=timestamp) + test_tags=test_tags, runnable=runnable, file_name=file_name, + file_bytes=file_bytes, eof=eof, mime_type=mime_type, + route_code=route_code, timestamp=timestamp) def _write_utf8(self, a_string, packet): utf8 = a_string.encode('utf-8') - assert len(utf8) < 65536 - packet.append(struct.pack(self.fmt_16, len(utf8))) + self._write_len16(len(utf8), packet) packet.append(utf8) + def _write_len16(self, length, packet): + assert length < 65536 + packet.append(struct.pack(FMT_16, length)) + def _write_packet(self, test_id=None, test_status=None, test_tags=None, - runnable=True, route_code=None, timestamp=None): - packet = [b'\xb3'] + runnable=True, file_name=None, file_bytes=None, eof=False, + mime_type=None, route_code=None, timestamp=None): + packet = [SIGNATURE] packet.append(b'FF') # placeholder for flags packet.append(b'FFF') # placeholder for length flags = 0x2000 # Version 0x2 - if test_id is not None: - flags = flags | 0x0800 - self._write_utf8(test_id, packet) if route_code is not None: - flags = flags | 0x0400 + flags = flags | FLAG_ROUTE_CODE + self._write_utf8(route_code, packet) if timestamp is not None: - flags = flags | 0x0200 - if runnable: - flags = flags | 0x0100 + flags = flags | FLAG_TIMESTAMP + since_epoch = timestamp - EPOCH + nanoseconds = since_epoch.microseconds * 1000 + seconds = (since_epoch.seconds + since_epoch.days * 24 * 3600) + packet.append(struct.pack(FMT_TIMESTAMP, seconds, nanoseconds)) + if test_id is not None: + flags = flags | FLAG_TEST_ID + self._write_utf8(test_id, packet) if test_tags: - flags = flags | 0x0080 - #if file_content: - # flags = flags | 0x0040 - #if mime_type: - # flags = flags | 0x0020 - # if eof: - # flags = flags | 0x0010 + flags = flags | FLAG_TAGS + self._write_len16(len(test_tags), packet) + for tag in test_tags: + self._write_utf8(tag, packet) + if runnable: + flags = flags | FLAG_RUNNABLE + if file_name is not None: + flags = flags | FLAG_FILE_CONTENT + self._write_utf8(file_name, packet) + packet.append(file_bytes) + if mime_type: + flags = flags | FLAG_MIME_TYPE + self._write_utf8(mime_type, packet) + if eof: + flags = flags | FLAG_EOF # 0x0008 - not used in v2. flags = flags | self.status_mask[test_status] - packet[1] = struct.pack(self.fmt_16, flags) - length = struct.pack(self.fmt_32, sum(map(len, packet)) + 4) + packet[1] = struct.pack(FMT_16, flags) + length = struct.pack(FMT_32, sum(map(len, packet)) + 4) assert length[0] == self.zero_b packet[2] = length[1:] # We could either do a partial application of crc32 over each chunk @@ -108,5 +141,160 @@ class StreamResultToBytes(object): # For now, simplest code: join, crc32, join, output content = b''.join(packet) self.output_stream.write(content + struct.pack( - self.fmt_32, zlib.crc32(content) & 0xffffffff)) + FMT_32, zlib.crc32(content) & 0xffffffff)) self.output_stream.flush() + + +class ByteStreamToStreamResult(object): + """Parse a subunit byte stream. + + Mixed streams that contain non-subunit content is supported when a + non_subunit_name is passed to the contructor. The default is to raise an + error containing the non-subunit byte after it has been read from the + stream. + + Typical use: + + >>> case = ByteStreamToStreamResult(sys.stdin.buffer) + >>> result = StreamResult() + >>> result.startTestRun() + >>> case.run(result) + >>> result.stopTestRun() + """ + + status_lookup = { + 0x0: None, + 0x1: 'exists', + 0x2: 'inprogress', + 0x3: 'success', + 0x4: 'uxsuccess', + 0x5: 'skip', + 0x6: 'fail', + 0x7: 'xfail', + } + + def __init__(self, source, non_subunit_name=None): + """Create a ByteStreamToStreamResult. + + :param source: A file like object to read bytes from. Must support + read(<count>). The file is not closed by ByteStreamToStreamResult. + :param non_subunit_name: If set to non-None, non subunit content + encountered in the stream will be converted into file packets + labelled with this name. + """ + self.non_subunit_name = non_subunit_name + self.source = source + + def run(self, result): + """Parse source and emit events to result. + + This is a blocking call: it will run until EOF is detected on source. + """ + while True: + content = self.source.read(1) + if not content: + # EOF + return + if content[0] != SIGNATURE[0]: + # Not subunit. + # TODO: do nonblocking IO and wait 5ms or so to send more + # efficient events than one per character. + if self.non_subunit_name is not None: + result.status( + file_name=self.non_subunit_name, file_bytes=content) + else: + raise Exception("Non subunit content", content) + continue + try: + packet = [SIGNATURE] + self._parse(packet, result) + except ParseError as error: + result.status(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=b''.join(packet)) + result.status(test_id="subunit.parser", test_status='fail', + eof=True, file_name="Parser Error", + file_bytes=(error.args[0]).encode('utf8')) + + def _parse(self, packet, result): + packet.append(self.source.read(5)) # 2 bytes flags, 3 bytes length. + flags = struct.unpack(FMT_16, packet[-1][:2])[0] + length = struct.unpack(FMT_32, packet[-1][1:])[0] & 0x00ffffff + packet.append(self.source.read(length - 6)) + if len(packet[-1]) != length - 6: + raise ParseError( + 'Short read - got %d bytes, wanted %d bytes' % ( + len(packet[-1]), length - 6)) + crc = zlib.crc32(packet[0]) + crc = zlib.crc32(packet[1], crc) + crc = zlib.crc32(packet[2][:-4], crc) & 0xffffffff + packet_crc = struct.unpack(FMT_32, packet[2][-4:])[0] + if crc != packet_crc: + # Bad CRC, report it and stop parsing the packet. + raise ParseError( + 'Bad checksum - calculated (0x%x), stored (0x%x)' + % (crc, packet_crc)) + # One packet could have both file and status data; the Python API + # presents these separately (perhaps it shouldn't?) + pos = 0 + if flags & FLAG_ROUTE_CODE: + route_code, pos = self._read_utf8(packet[2], pos) + else: + route_code = None + if flags & FLAG_TIMESTAMP: + seconds, nanoseconds = struct.unpack(FMT_TIMESTAMP, packet[2][pos:pos+8]) + pos += 8 + timestamp = EPOCH + datetime.timedelta( + seconds=seconds, microseconds=nanoseconds/1000) + else: + timestamp = None + if flags & FLAG_TEST_ID: + test_id, pos = self._read_utf8(packet[2], pos) + else: + test_id = None + if flags & FLAG_TAGS: + tag_count, pos = self._read_len16(packet[2], pos) + test_tags = set() + for _ in range(tag_count): + tag, pos = self._read_utf8(packet[2], pos) + test_tags.add(tag) + else: + test_tags = None + if flags & FLAG_MIME_TYPE: + mime_type, pos = self._read_utf8(packet[2], pos) + else: + mime_type = None + if flags & FLAG_FILE_CONTENT: + file_name, pos = self._read_utf8(packet[2], pos) + file_bytes = packet[2][pos:-4] + else: + file_name = None + file_bytes = None + runnable = bool(flags & FLAG_RUNNABLE) + eof = bool(flags & FLAG_EOF) + test_status = self.status_lookup[flags & 0x0007] + result.status(test_id=test_id, test_status=test_status, + test_tags=test_tags, runnable=runnable, mime_type=mime_type, + eof=eof, file_name=file_name, file_bytes=file_bytes, + route_code=route_code, timestamp=timestamp) + __call__ = run + + def _read_len16(self, buf, pos): + length = struct.unpack(FMT_16, buf[pos:pos+2])[0] + return length, pos + 2 + + def _read_utf8(self, buf, pos): + length, pos = self._read_len16(buf, pos) + utf8_bytes = buf[pos:pos+length] + if length != len(utf8_bytes): + raise ParseError( + 'UTF8 string at offset %d extends past end of packet: ' + 'claimed %d bytes, %d available' % (pos - 2, length, + len(utf8_bytes))) + if NUL_ELEMENT in utf8_bytes: + raise ParseError('UTF8 string at offset %d contains NUL byte' % ( + pos-2,)) + try: + return utf8_bytes.decode('utf-8'), length+pos + except UnicodeDecodeError: + raise ParseError('UTF8 string at offset %d is not UTF8' % (pos-2,)) + |
