diff options
| author | Robert Collins <robertc@robertcollins.net> | 2013-03-31 18:45:01 +1300 |
|---|---|---|
| committer | Robert Collins <robertc@robertcollins.net> | 2013-03-31 18:45:01 +1300 |
| commit | 03264963c5fc0a145880c20cd1de9800a23a206c (patch) | |
| tree | f421ed0be46efcde9cf31d522ff23ee9d44e6204 /python | |
| parent | 205803cd9aa68bae23d284593cbd5ec3135ac5e2 (diff) | |
| download | subunit-git-03264963c5fc0a145880c20cd1de9800a23a206c.tar.gz | |
Switch to variable length encoded integers.
Diffstat (limited to 'python')
| -rwxr-xr-x | python/subunit/run.py | 2 | ||||
| -rw-r--r-- | python/subunit/tests/test_test_protocol2.py | 245 | ||||
| -rw-r--r-- | python/subunit/v2.py | 288 |
3 files changed, 373 insertions, 162 deletions
diff --git a/python/subunit/run.py b/python/subunit/run.py index b78bf32..2a3de42 100755 --- a/python/subunit/run.py +++ b/python/subunit/run.py @@ -110,7 +110,7 @@ if __name__ == '__main__': # on non-ttys. stream = get_default_formatter() runner = SubunitTestRunner - # Patch stdout to be unbuffered, so that pdb works well. + # Patch stdout to be unbuffered, so that pdb works well on 2.6/2.7. binstdout = io.open(sys.stdout.fileno(), 'wb', 0) if sys.version_info[0] > 2: sys.stdout = io.TextIOWrapper(binstdout, encoding=sys.stdout.encoding) diff --git a/python/subunit/tests/test_test_protocol2.py b/python/subunit/tests/test_test_protocol2.py index 834f2f9..002efa7 100644 --- a/python/subunit/tests/test_test_protocol2.py +++ b/python/subunit/tests/test_test_protocol2.py @@ -18,12 +18,29 @@ from io import BytesIO import datetime from testtools import TestCase +from testtools.matchers import HasLength from testtools.tests.test_testresult import TestStreamResultContract from testtools.testresult.doubles import StreamResult import subunit import subunit.iso8601 as iso8601 +CONSTANT_ENUM = b'\xb3)\x01\x0c\x03foo\x08U_\x1b' +CONSTANT_INPROGRESS = b'\xb3)\x02\x0c\x03foo\x8e\xc1-\xb5' +CONSTANT_SUCCESS = b'\xb3)\x03\x0c\x03fooE\x9d\xfe\x10' +CONSTANT_UXSUCCESS = b'\xb3)\x04\x0c\x03fooX\x98\xce\xa8' +CONSTANT_SKIP = b'\xb3)\x05\x0c\x03foo\x93\xc4\x1d\r' +CONSTANT_FAIL = b'\xb3)\x06\x0c\x03foo\x15Po\xa3' +CONSTANT_XFAIL = b'\xb3)\x07\x0c\x03foo\xde\x0c\xbc\x06' +CONSTANT_EOF = b'\xb3!\x10\x08S\x15\x88\xdc' +CONSTANT_FILE_CONTENT = b'\xb3!@\x13\x06barney\x03wooA5\xe3\x8c' +CONSTANT_MIME = b'\xb3! #\x1aapplication/foo; charset=1x3Q\x15' +CONSTANT_TIMESTAMP = b'\xb3+\x03\x13<\x17T\xcf\x80\xaf\xc8\x03barI\x96>-' +CONSTANT_ROUTE_CODE = b'\xb3-\x03\x13\x03bar\x06source\x9cY9\x19' +CONSTANT_RUNNABLE = b'\xb3(\x03\x0c\x03foo\xe3\xea\xf5\xa4' +CONSTANT_TAGS = b'\xb3)\x80\x15\x03bar\x02\x03foo\x03barTHn\xb4' + + class TestStreamResultToBytesContract(TestCase, TestStreamResultContract): """Check that StreamResult behaves as testtools expects.""" @@ -37,47 +54,119 @@ class TestStreamResultToBytes(TestCase): output = BytesIO() return subunit.StreamResultToBytes(output), output + def test_numbers(self): + result = subunit.StreamResultToBytes(BytesIO()) + packet = [] + self.assertRaises(Exception, result._write_number, -1, packet) + self.assertEqual([], packet) + result._write_number(0, packet) + self.assertEqual([b'\x00'], packet) + del packet[:] + result._write_number(63, packet) + self.assertEqual([b'\x3f'], packet) + del packet[:] + result._write_number(64, packet) + self.assertEqual([b'\x40\x40'], packet) + del packet[:] + result._write_number(16383, packet) + self.assertEqual([b'\x7f\xff'], packet) + del packet[:] + result._write_number(16384, packet) + self.assertEqual([b'\x80\x40', b'\x00'], packet) + del packet[:] + result._write_number(4194303, packet) + self.assertEqual([b'\xbf\xff', b'\xff'], packet) + del packet[:] + result._write_number(4194304, packet) + self.assertEqual([b'\xc0\x40\x00\x00'], packet) + del packet[:] + result._write_number(1073741823, packet) + self.assertEqual([b'\xff\xff\xff\xff'], packet) + del packet[:] + self.assertRaises(Exception, result._write_number, 1073741824, packet) + self.assertEqual([], packet) + + def test_volatile_length(self): + # if the length of the packet data before the length itself is + # considered is right on the boundary for length's variable length + # encoding, it is easy to get the length wrong by not accounting for + # length itself. + # that is, the encoder has to ensure that length == sum (length_of_rest + # + length_of_length) + result, output = self._make_result() + # 1 byte short: + result.status(file_name="", file_bytes=b'\xff'*0) + self.assertThat(output.getvalue(), HasLength(10)) + self.assertEqual(b'\x0a', output.getvalue()[3:4]) + output.seek(0) + output.truncate() + # 1 byte long: + result.status(file_name="", file_bytes=b'\xff'*53) + self.assertThat(output.getvalue(), HasLength(63)) + self.assertEqual(b'\x3f', output.getvalue()[3:4]) + output.seek(0) + output.truncate() + # 2 bytes short + result.status(file_name="", file_bytes=b'\xff'*54) + self.assertThat(output.getvalue(), HasLength(65)) + self.assertEqual(b'\x40\x41', output.getvalue()[3:5]) + output.seek(0) + output.truncate() + # 2 bytes long + result.status(file_name="", file_bytes=b'\xff'*16371) + self.assertThat(output.getvalue(), HasLength(16383)) + self.assertEqual(b'\x7f\xff', output.getvalue()[3:5]) + output.seek(0) + output.truncate() + # 3 bytes short + result.status(file_name="", file_bytes=b'\xff'*16372) + self.assertThat(output.getvalue(), HasLength(16385)) + self.assertEqual(b'\x80\x40\x01', output.getvalue()[3:6]) + output.seek(0) + output.truncate() + # 3 bytes long + result.status(file_name="", file_bytes=b'\xff'*4194289) + self.assertThat(output.getvalue(), HasLength(4194303)) + self.assertEqual(b'\xbf\xff\xff', output.getvalue()[3:6]) + output.seek(0) + output.truncate() + self.assertRaises(Exception, result.status, file_name="", + file_bytes=b'\xff'*4194290) + def test_trivial_enumeration(self): result, output = self._make_result() result.status("foo", 'exists') - self.assertEqual(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f', - output.getvalue()) + self.assertEqual(CONSTANT_ENUM, output.getvalue()) def test_inprogress(self): result, output = self._make_result() result.status("foo", 'inprogress') - self.assertEqual(b'\xb3\x29\x02\0\0\x0f\0\x03foo\xa0\x81\x08\xfa', - output.getvalue()) + self.assertEqual(CONSTANT_INPROGRESS, output.getvalue()) def test_success(self): result, output = self._make_result() result.status("foo", 'success') - self.assertEqual(b'\xb3\x29\x03\0\0\x0f\0\x03foo\xb7\xfa\x1c\xb9', - output.getvalue()) + self.assertEqual(CONSTANT_SUCCESS, output.getvalue()) def test_uxsuccess(self): result, output = self._make_result() result.status("foo", 'uxsuccess') - self.assertEqual(b'\xb3\x29\x04\0\0\x0f\0\x03foo\xd3\x9bqp', - output.getvalue()) + self.assertEqual(CONSTANT_UXSUCCESS, output.getvalue()) def test_skip(self): result, output = self._make_result() result.status("foo", 'skip') - self.assertEqual(b'\xb3\x29\x05\0\0\x0f\0\x03foo\xc4\xe0e3', - output.getvalue()) + self.assertEqual(CONSTANT_SKIP, output.getvalue()) def test_fail(self): result, output = self._make_result() result.status("foo", 'fail') - self.assertEqual(b'\xb3\x29\x06\0\0\x0f\0\x03foo\xfdmY\xf6', - output.getvalue()) + self.assertEqual(CONSTANT_FAIL, output.getvalue()) def test_xfail(self): result, output = self._make_result() result.status("foo", 'xfail') - self.assertEqual(b'\xb3\x29\x07\0\0\x0f\0\x03foo\xea\x16M\xb5', - output.getvalue()) + self.assertEqual(CONSTANT_XFAIL, output.getvalue()) def test_unknown_status(self): result, output = self._make_result() @@ -87,50 +176,40 @@ class TestStreamResultToBytes(TestCase): def test_eof(self): result, output = self._make_result() result.status(eof=True) - self.assertEqual( - b'\xb3!\x10\x00\x00\na\xf1xM', - output.getvalue()) + self.assertEqual(CONSTANT_EOF, output.getvalue()) def test_file_content(self): result, output = self._make_result() result.status(file_name="barney", file_bytes=b"woo") - self.assertEqual( - b'\xb3!@\x00\x00\x15\x00\x06barneywoo\xfd\xecu\x1c', - output.getvalue()) + self.assertEqual(CONSTANT_FILE_CONTENT, output.getvalue()) def test_mime(self): result, output = self._make_result() result.status(mime_type="application/foo; charset=1") - self.assertEqual( - b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xf9', - output.getvalue()) + self.assertEqual(CONSTANT_MIME, output.getvalue()) def test_route_code(self): result, output = self._make_result() result.status(test_id="bar", test_status='success', route_code="source") - self.assertEqual(b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x03bar\xad\xbd\x8c$', - output.getvalue()) + self.assertEqual(CONSTANT_ROUTE_CODE, output.getvalue()) def test_runnable(self): result, output = self._make_result() result.status("foo", 'success', runnable=False) - self.assertEqual(b'\xb3(\x03\x00\x00\x0f\x00\x03fooX8w\x87', - output.getvalue()) + self.assertEqual(CONSTANT_RUNNABLE, output.getvalue()) def test_tags(self): result, output = self._make_result() result.status(test_id="bar", test_tags=set(['foo', 'bar'])) - self.assertEqual(b'\xb3)\x80\x00\x00\x1b\x00\x03bar\x00\x02\x00\x03foo\x00\x03bar\xabMw\xe6', - output.getvalue()) + self.assertEqual(CONSTANT_TAGS, output.getvalue()) def test_timestamp(self): timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45, iso8601.Utc()) result, output = self._make_result() result.status(test_id="bar", test_status='success', timestamp=timestamp) - self.assertEqual(b'\xb3+\x03\x00\x00\x17<\x17T\xcf\x00\x00\xaf\xc8\x00\x03barU>\xb2\xdb', - output.getvalue()) + self.assertEqual(CONSTANT_TIMESTAMP, output.getvalue()) class TestByteStreamToStreamResult(TestCase): @@ -152,6 +231,20 @@ class TestByteStreamToStreamResult(TestCase): ], result._events) self.assertEqual(b'', source.read()) + def test_signature_middle_utf8_char(self): + utf8_bytes = b'\xe3\xb3\x8a' + source = BytesIO(utf8_bytes) + # Should be treated as one character (it is u'\u3cca') and wrapped + result = StreamResult() + subunit.ByteStreamToStreamResult( + source, non_subunit_name="stdout").run( + result) + self.assertEqual([ + ('status', None, None, None, True, 'stdout', b'\xe3', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'\xb3', False, None, None, None), + ('status', None, None, None, True, 'stdout', b'\x8a', False, None, None, None), + ], result._events) + def test_non_subunit_disabled_raises(self): source = BytesIO(b"foo\nbar\n") result = StreamResult() @@ -162,7 +255,7 @@ class TestByteStreamToStreamResult(TestCase): self.assertEqual([], result._events) def test_trivial_enumeration(self): - source = BytesIO(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f') + source = BytesIO(CONSTANT_ENUM) result = StreamResult() subunit.ByteStreamToStreamResult( source, non_subunit_name="stdout").run(result) @@ -172,8 +265,7 @@ class TestByteStreamToStreamResult(TestCase): ], result._events) def test_multiple_events(self): - source = BytesIO(b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f' - b'\xb3\x29\x01\0\0\x0f\0\x03foo\x99\x0c\x34\x3f') + source = BytesIO(CONSTANT_ENUM + CONSTANT_ENUM) result = StreamResult() subunit.ByteStreamToStreamResult( source, non_subunit_name="stdout").run(result) @@ -184,28 +276,22 @@ class TestByteStreamToStreamResult(TestCase): ], result._events) def test_inprogress(self): - self.check_event( - b'\xb3\x29\x02\0\0\x0f\0\x03foo\xa0\x81\x08\xfa', 'inprogress') + self.check_event(CONSTANT_INPROGRESS, 'inprogress') def test_success(self): - self.check_event( - b'\xb3\x29\x03\0\0\x0f\0\x03foo\xb7\xfa\x1c\xb9', 'success') + self.check_event(CONSTANT_SUCCESS, 'success') def test_uxsuccess(self): - self.check_event( - b'\xb3\x29\x04\0\0\x0f\0\x03foo\xd3\x9bqp', 'uxsuccess') + self.check_event(CONSTANT_UXSUCCESS, 'uxsuccess') def test_skip(self): - self.check_event( - b'\xb3\x29\x05\0\0\x0f\0\x03foo\xc4\xe0e3', 'skip') + self.check_event(CONSTANT_SKIP, 'skip') def test_fail(self): - self.check_event( - b'\xb3\x29\x06\0\0\x0f\0\x03foo\xfdmY\xf6', 'fail') + self.check_event(CONSTANT_FAIL, 'fail') def test_xfail(self): - self.check_event( - b'\xb3\x29\x07\0\0\x0f\0\x03foo\xea\x16M\xb5', 'xfail') + self.check_event(CONSTANT_XFAIL, 'xfail') def check_events(self, source_bytes, events): source = BytesIO(source_bytes) @@ -231,86 +317,95 @@ class TestByteStreamToStreamResult(TestCase): file_bytes, eof, mime_type, route_code, timestamp) def test_eof(self): - self.check_event( - b'\xb3!\x10\x00\x00\na\xf1xM', - test_id=None, eof=True) + self.check_event(CONSTANT_EOF, test_id=None, eof=True) def test_file_content(self): - self.check_event( - b'\xb3!@\x00\x00\x15\x00\x06barneywoo\xfd\xecu\x1c', + self.check_event(CONSTANT_FILE_CONTENT, test_id=None, file_name="barney", file_bytes=b"woo") + def test_file_content_length_into_checksum(self): + # A bad file content length which creeps into the checksum. + bad_file_length_content = b'\xb3!@\x13\x06barney\x04woo\xdc\xe2\xdb\x35' + self.check_events(bad_file_length_content, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=bad_file_length_content), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b"File content extends past end of packet: claimed 4 bytes, 3 available"), + ]) + + def test_packet_length_4_word_varint(self): + packet_data = b'\xb3!@\xc0\x00\x11' + self.check_events(packet_data, [ + self._event(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=packet_data), + self._event(test_id="subunit.parser", test_status="fail", eof=True, + file_name="Parser Error", + file_bytes=b"3 byte maximum given but 4 byte value found."), + ]) + def test_mime(self): - self.check_event( - b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xf9', + self.check_event(CONSTANT_MIME, test_id=None, mime_type='application/foo; charset=1') def test_route_code(self): - self.check_event( - b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x03bar\xad\xbd\x8c$', + self.check_event(CONSTANT_ROUTE_CODE, 'success', route_code="source", test_id="bar") def test_runnable(self): - self.check_event( - b'\xb3(\x03\x00\x00\x0f\x00\x03fooX8w\x87', + self.check_event(CONSTANT_RUNNABLE, test_status='success', runnable=False) def test_tags(self): - self.check_event( - b'\xb3)\x80\x00\x00\x1b\x00\x03bar\x00\x02\x00\x03foo\x00\x03bar\xabMw\xe6', + self.check_event(CONSTANT_TAGS, None, tags=set(['foo', 'bar']), test_id="bar") def test_timestamp(self): timestamp = datetime.datetime(2001, 12, 12, 12, 59, 59, 45, iso8601.Utc()) - self.check_event( - b'\xb3+\x03\x00\x00\x17<\x17T\xcf\x00\x00\xaf\xc8\x00\x03barU>\xb2\xdb', + self.check_event(CONSTANT_TIMESTAMP, 'success', test_id='bar', timestamp=timestamp) def test_bad_crc_errors_via_status(self): - file_bytes = \ - b'\xb3! \x00\x00&\x00\x1aapplication/foo; charset=1]#\xf9\xee' + file_bytes = CONSTANT_MIME[:-1] + b'\x00' self.check_events( file_bytes, [ self._event(test_id="subunit.parser", eof=True, file_name="Packet data", file_bytes=file_bytes), self._event(test_id="subunit.parser", test_status="fail", eof=True, file_name="Parser Error", - file_bytes=b'Bad checksum - calculated (0x5d23f9f9), ' - b'stored (0x5d23f9ee)'), + file_bytes=b'Bad checksum - calculated (0x78335115), ' + b'stored (0x78335100)'), ]) def test_not_utf8_in_string(self): - file_bytes = \ - b'\xb3-\x03\x00\x00\x17\x00\x06\xb4ource\x00\x03bar\x25\x2f\xb5\xd7' + file_bytes = CONSTANT_ROUTE_CODE[:5] + b'\xb4' + CONSTANT_ROUTE_CODE[6:-4] + b'\xce\x56\xc6\x17' self.check_events(file_bytes, [ self._event(test_id="subunit.parser", eof=True, file_name="Packet data", file_bytes=file_bytes), self._event(test_id="subunit.parser", test_status="fail", eof=True, file_name="Parser Error", - file_bytes=b'UTF8 string at offset 0 is not UTF8'), + file_bytes=b'UTF8 string at offset 2 is not UTF8'), ]) def test_NULL_in_string(self): - file_bytes = \ - b'\xb3-\x03\x00\x00\x17\x00\x06so\x00rce\x00\x03bar\x17\x89\x0a\xbe' + file_bytes = CONSTANT_ROUTE_CODE[:6] + b'\x00' + CONSTANT_ROUTE_CODE[7:-4] + b'\xd7\x41\xac\xfe' self.check_events(file_bytes, [ self._event(test_id="subunit.parser", eof=True, file_name="Packet data", file_bytes=file_bytes), self._event(test_id="subunit.parser", test_status="fail", eof=True, file_name="Parser Error", - file_bytes=b'UTF8 string at offset 0 contains NUL byte'), + file_bytes=b'UTF8 string at offset 2 contains NUL byte'), ]) def test_bad_utf8_stringlength(self): - file_bytes = \ - b'\xb3-\x03\x00\x00\x17\x00\x06source\x00\x08bar\x7a\xbc\x0b\x25' + file_bytes = CONSTANT_ROUTE_CODE[:4] + b'\x3f' + CONSTANT_ROUTE_CODE[5:-4] + b'\xbe\x29\xe0\xc2' self.check_events(file_bytes, [ self._event(test_id="subunit.parser", eof=True, file_name="Packet data", file_bytes=file_bytes), self._event(test_id="subunit.parser", test_status="fail", eof=True, file_name="Parser Error", - file_bytes=b'UTF8 string at offset 8 extends past end of ' - b'packet: claimed 8 bytes, 7 available'), + file_bytes=b'UTF8 string at offset 2 extends past end of ' + b'packet: claimed 63 bytes, 10 available'), ]) diff --git a/python/subunit/v2.py b/python/subunit/v2.py index 34ab838..bbc20da 100644 --- a/python/subunit/v2.py +++ b/python/subunit/v2.py @@ -14,6 +14,7 @@ # limitations under that license. # +import codecs import datetime from io import UnsupportedOperation import os @@ -21,6 +22,8 @@ import select import struct import zlib +from extras import safe_hasattr + import subunit import subunit.iso8601 as iso8601 @@ -30,7 +33,9 @@ __all__ = [ ] SIGNATURE = b'\xb3' +FMT_8 = '>B' FMT_16 = '>H' +FMT_24 = '>HB' FMT_32 = '>I' FMT_TIMESTAMP = '>II' FLAG_TEST_ID = 0x0800 @@ -94,35 +99,54 @@ class StreamResultToBytes(object): def _write_utf8(self, a_string, packet): utf8 = a_string.encode('utf-8') - self._write_len16(len(utf8), packet) + self._write_number(len(utf8), packet) packet.append(utf8) def _write_len16(self, length, packet): assert length < 65536 packet.append(struct.pack(FMT_16, length)) + def _write_number(self, value, packet): + packet.extend(self._encode_number(value)) + + def _encode_number(self, value): + assert value >= 0 + if value < 64: + return [struct.pack(FMT_8, value)] + elif value < 16384: + value = value | 0x4000 + return [struct.pack(FMT_16, value)] + elif value < 4194304: + value = value | 0x800000 + return [struct.pack(FMT_16, value >> 8), + struct.pack(FMT_8, value & 0xff)] + elif value < 1073741824: + value = value | 0xc0000000 + return [struct.pack(FMT_32, value)] + else: + raise ValueError('value too large to encode: %r' % (value,)) + def _write_packet(self, test_id=None, test_status=None, test_tags=None, runnable=True, file_name=None, file_bytes=None, eof=False, mime_type=None, route_code=None, timestamp=None): packet = [SIGNATURE] packet.append(b'FF') # placeholder for flags - packet.append(b'FFF') # placeholder for length + # placeholder for length, but see below as length is variable. + packet.append(b'') flags = 0x2000 # Version 0x2 - if route_code is not None: - flags = flags | FLAG_ROUTE_CODE - self._write_utf8(route_code, packet) if timestamp is not None: flags = flags | FLAG_TIMESTAMP since_epoch = timestamp - EPOCH nanoseconds = since_epoch.microseconds * 1000 seconds = (since_epoch.seconds + since_epoch.days * 24 * 3600) - packet.append(struct.pack(FMT_TIMESTAMP, seconds, nanoseconds)) + packet.append(struct.pack(FMT_32, seconds)) + self._write_number(nanoseconds, packet) if test_id is not None: flags = flags | FLAG_TEST_ID self._write_utf8(test_id, packet) if test_tags: flags = flags | FLAG_TAGS - self._write_len16(len(test_tags), packet) + self._write_number(len(test_tags), packet) for tag in test_tags: self._write_utf8(tag, packet) if runnable: @@ -133,15 +157,34 @@ class StreamResultToBytes(object): if file_name is not None: flags = flags | FLAG_FILE_CONTENT self._write_utf8(file_name, packet) + self._write_number(len(file_bytes), packet) packet.append(file_bytes) if eof: flags = flags | FLAG_EOF + if route_code is not None: + flags = flags | FLAG_ROUTE_CODE + self._write_utf8(route_code, packet) # 0x0008 - not used in v2. flags = flags | self.status_mask[test_status] packet[1] = struct.pack(FMT_16, flags) - length = struct.pack(FMT_32, sum(map(len, packet)) + 4) - assert length[0] == self.zero_b - packet[2] = length[1:] + base_length = sum(map(len, packet)) + 4 + if base_length <= 62: + # one byte to encode length, 62+1 = 63 + length_length = 1 + elif base_length <= 16381: + # two bytes to encode length, 16381+2 = 16383 + length_length = 2 + elif base_length <= 4194300: + # three bytes to encode length, 419430+3=4194303 + length_length = 3 + else: + # Longer than policy: + # TODO: chunk the packet automatically? + # - strip all but file data + # - do 4M chunks of that till done + # - include original data in final chunk. + raise ValueError("Length too long: %r" % base_length) + packet[2:3] = self._encode_number(base_length + length_length) # We could either do a partial application of crc32 over each chunk # or a single join to a temp variable then a final join # or two writes (that python might then split). @@ -193,124 +236,200 @@ class ByteStreamToStreamResult(object): """ self.non_subunit_name = non_subunit_name self.source = subunit.make_stream_binary(source) + self.codec = codecs.lookup('utf8').incrementaldecoder() def run(self, result): """Parse source and emit events to result. This is a blocking call: it will run until EOF is detected on source. """ + self.codec.reset() + mid_character = False while True: + # We're in blocking mode; read one char content = self.source.read(1) if not content: # EOF return - if content[0] != SIGNATURE[0]: - # Not subunit. - if self.non_subunit_name is None: - raise Exception("Non subunit content", content) - # Aggregate all content that is not subunit until either - # 1MiB is accumulated or 50ms has passed with no input. - # Both are arbitrary amounts intended to give a simple - # balance between efficiency (avoiding death by a thousand - # one-byte packets), buffering (avoiding overlarge state - # being hidden on intermediary nodes) and interactivity - # (when driving a debugger, slow response to typing is - # annoying). - buffered = [content] - while len(buffered[-1]): - try: - self.source.fileno() - except: - # Won't be able to select, fallback to - # one-byte-at-a-time. + if not mid_character and content[0] == SIGNATURE[0]: + self._parse_packet(result) + continue + if self.non_subunit_name is None: + raise Exception("Non subunit content", content) + try: + if self.codec.decode(content): + # End of a character + mid_character = False + else: + mid_character = True + except UnicodeDecodeError: + # Bad unicode, not our concern. + mid_character = False + # Aggregate all content that is not subunit until either + # 1MiB is accumulated or 50ms has passed with no input. + # Both are arbitrary amounts intended to give a simple + # balance between efficiency (avoiding death by a thousand + # one-byte packets), buffering (avoiding overlarge state + # being hidden on intermediary nodes) and interactivity + # (when driving a debugger, slow response to typing is + # annoying). + buffered = [content] + while len(buffered[-1]): + try: + self.source.fileno() + except: + # Won't be able to select, fallback to + # one-byte-at-a-time. + break + # Note: this has a very low timeout because with stdin, the + # BufferedIO layer typically has all the content available + # from the stream when e.g. pdb is dropped into, leading to + # select always timing out when in fact we could have read + # (from the buffer layer) - we typically fail to aggregate + # any content on 3.x Pythons. + readable = select.select([self.source], [], [], 0.000001)[0] + if readable: + content = self.source.read(1) + if not len(content): + # EOF - break and emit buffered. break - # Note: this has a very low timeout because with stdin, the - # BufferedIO layer typically has all the content available - # from the stream when e.g. pdb is dropped into, leading to - # select always timing out when in fact we could have read - # (from the buffer layer) - we typically fail to aggregate - # any content on 3.x Pythons. - readable = select.select([self.source], [], [], 0.000001)[0] - if readable: - content = self.source.read(1) - if len(content) and content[0] != SIGNATURE[0]: - buffered.append(content) - else: - # EOF or we hit a new packet. - break - if not readable or len(buffered) >= 1048576: + if not mid_character and content[0] == SIGNATURE[0]: + # New packet, break, emit buffered, then parse. break - result.status( - file_name=self.non_subunit_name, - file_bytes=b''.join(buffered)) - if not len(content) or content[0] != SIGNATURE[0]: - continue - # Fall through to process the packet whose first byte is in - # content. - try: - packet = [SIGNATURE] - self._parse(packet, result) - except ParseError as error: - result.status(test_id="subunit.parser", eof=True, - file_name="Packet data", file_bytes=b''.join(packet)) - result.status(test_id="subunit.parser", test_status='fail', - eof=True, file_name="Parser Error", - file_bytes=(error.args[0]).encode('utf8')) + buffered.append(content) + # Feed into the codec. + try: + if self.codec.decode(content): + # End of a character + mid_character = False + else: + mid_character = True + except UnicodeDecodeError: + # Bad unicode, not our concern. + mid_character = False + if not readable or len(buffered) >= 1048576: + # timeout or too much data, emit what we have. + break + result.status( + file_name=self.non_subunit_name, + file_bytes=b''.join(buffered)) + if mid_character or not len(content) or content[0] != SIGNATURE[0]: + continue + # Otherwise, parse a data packet. + self._parse_packet(result) + + def _parse_packet(self, result): + try: + packet = [SIGNATURE] + self._parse(packet, result) + except ParseError as error: + result.status(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=b''.join(packet)) + result.status(test_id="subunit.parser", test_status='fail', + eof=True, file_name="Parser Error", + file_bytes=(error.args[0]).encode('utf8')) + + def _parse_varint(self, data, pos, max_3_bytes=False): + # because the only incremental IO we do is at the start, and the 32 bit + # CRC means we can always safely read enough to cover any varint, we + # can be sure that there should be enough data - and if not it is an + # error not a normal situation. + data_0 = struct.unpack(FMT_8, data[pos:pos+1])[0] + typeenum = data_0 & 0xc0 + value_0 = data_0 & 0x3f + if typeenum == 0x00: + return value_0, 1 + elif typeenum == 0x40: + data_1 = struct.unpack(FMT_8, data[pos+1:pos+2])[0] + return (value_0 << 8) | data_1, 2 + elif typeenum == 0x80: + data_1 = struct.unpack(FMT_16, data[pos+1:pos+3])[0] + return (value_0 << 16) | data_1, 3 + else: + if max_3_bytes: + raise ParseError('3 byte maximum given but 4 byte value found.') + data_1, data_2 = struct.unpack(FMT_24, data[pos+1:pos+4]) + result = (value_0 << 24) | data_1 << 8 | data_2 + return result, 4 def _parse(self, packet, result): - packet.append(self.source.read(5)) # 2 bytes flags, 3 bytes length. + # 2 bytes flags, at most 3 bytes length. + packet.append(self.source.read(5)) flags = struct.unpack(FMT_16, packet[-1][:2])[0] - length = struct.unpack(FMT_32, packet[-1][1:])[0] & 0x00ffffff - packet.append(self.source.read(length - 6)) - if len(packet[-1]) != length - 6: + length, consumed = self._parse_varint( + packet[-1], 2, max_3_bytes=True) + remainder = self.source.read(length - 6) + if len(remainder) != length - 6: raise ParseError( 'Short read - got %d bytes, wanted %d bytes' % ( - len(packet[-1]), length - 6)) + len(remainder), length - 6)) + if consumed != 3: + # Avoid having to parse torn values + packet[-1] += remainder + pos = 2 + consumed + else: + # Avoid copying potentially lots of data. + packet.append(remainder) + pos = 0 crc = zlib.crc32(packet[0]) - crc = zlib.crc32(packet[1], crc) - crc = zlib.crc32(packet[2][:-4], crc) & 0xffffffff - packet_crc = struct.unpack(FMT_32, packet[2][-4:])[0] + for fragment in packet[1:-1]: + crc = zlib.crc32(fragment, crc) + crc = zlib.crc32(packet[-1][:-4], crc) & 0xffffffff + packet_crc = struct.unpack(FMT_32, packet[-1][-4:])[0] if crc != packet_crc: # Bad CRC, report it and stop parsing the packet. raise ParseError( 'Bad checksum - calculated (0x%x), stored (0x%x)' % (crc, packet_crc)) + if safe_hasattr(__builtins__, 'memoryview'): + body = memoryview(packet[-1]) + else: + body = packet[-1] + # Discard CRC-32 + body = body[:-4] # One packet could have both file and status data; the Python API # presents these separately (perhaps it shouldn't?) - pos = 0 - if flags & FLAG_ROUTE_CODE: - route_code, pos = self._read_utf8(packet[2], pos) - else: - route_code = None if flags & FLAG_TIMESTAMP: - seconds, nanoseconds = struct.unpack(FMT_TIMESTAMP, packet[2][pos:pos+8]) - pos += 8 + seconds = struct.unpack(FMT_32, body[pos:pos+4])[0] + nanoseconds, consumed = self._parse_varint(body, pos+4) + pos = pos + 4 + consumed timestamp = EPOCH + datetime.timedelta( seconds=seconds, microseconds=nanoseconds/1000) else: timestamp = None if flags & FLAG_TEST_ID: - test_id, pos = self._read_utf8(packet[2], pos) + test_id, pos = self._read_utf8(body, pos) else: test_id = None if flags & FLAG_TAGS: - tag_count, pos = self._read_len16(packet[2], pos) + tag_count, consumed = self._parse_varint(body, pos) + pos += consumed test_tags = set() for _ in range(tag_count): - tag, pos = self._read_utf8(packet[2], pos) + tag, pos = self._read_utf8(body, pos) test_tags.add(tag) else: test_tags = None if flags & FLAG_MIME_TYPE: - mime_type, pos = self._read_utf8(packet[2], pos) + mime_type, pos = self._read_utf8(body, pos) else: mime_type = None if flags & FLAG_FILE_CONTENT: - file_name, pos = self._read_utf8(packet[2], pos) - file_bytes = packet[2][pos:-4] + file_name, pos = self._read_utf8(body, pos) + content_length, consumed = self._parse_varint(body, pos) + pos += consumed + file_bytes = body[pos:pos+content_length] + if len(file_bytes) != content_length: + raise ParseError('File content extends past end of packet: ' + 'claimed %d bytes, %d available' % ( + content_length, len(file_bytes))) else: file_name = None file_bytes = None + if flags & FLAG_ROUTE_CODE: + route_code, pos = self._read_utf8(body, pos) + else: + route_code = None runnable = bool(flags & FLAG_RUNNABLE) eof = bool(flags & FLAG_EOF) test_status = self.status_lookup[flags & 0x0007] @@ -320,12 +439,9 @@ class ByteStreamToStreamResult(object): route_code=route_code, timestamp=timestamp) __call__ = run - def _read_len16(self, buf, pos): - length = struct.unpack(FMT_16, buf[pos:pos+2])[0] - return length, pos + 2 - def _read_utf8(self, buf, pos): - length, pos = self._read_len16(buf, pos) + length, consumed = self._parse_varint(buf, pos) + pos += consumed utf8_bytes = buf[pos:pos+length] if length != len(utf8_bytes): raise ParseError( |
