diff options
Diffstat (limited to 'python/subunit/v2.py')
| -rw-r--r-- | python/subunit/v2.py | 288 |
1 files changed, 202 insertions, 86 deletions
diff --git a/python/subunit/v2.py b/python/subunit/v2.py index 34ab838..bbc20da 100644 --- a/python/subunit/v2.py +++ b/python/subunit/v2.py @@ -14,6 +14,7 @@ # limitations under that license. # +import codecs import datetime from io import UnsupportedOperation import os @@ -21,6 +22,8 @@ import select import struct import zlib +from extras import safe_hasattr + import subunit import subunit.iso8601 as iso8601 @@ -30,7 +33,9 @@ __all__ = [ ] SIGNATURE = b'\xb3' +FMT_8 = '>B' FMT_16 = '>H' +FMT_24 = '>HB' FMT_32 = '>I' FMT_TIMESTAMP = '>II' FLAG_TEST_ID = 0x0800 @@ -94,35 +99,54 @@ class StreamResultToBytes(object): def _write_utf8(self, a_string, packet): utf8 = a_string.encode('utf-8') - self._write_len16(len(utf8), packet) + self._write_number(len(utf8), packet) packet.append(utf8) def _write_len16(self, length, packet): assert length < 65536 packet.append(struct.pack(FMT_16, length)) + def _write_number(self, value, packet): + packet.extend(self._encode_number(value)) + + def _encode_number(self, value): + assert value >= 0 + if value < 64: + return [struct.pack(FMT_8, value)] + elif value < 16384: + value = value | 0x4000 + return [struct.pack(FMT_16, value)] + elif value < 4194304: + value = value | 0x800000 + return [struct.pack(FMT_16, value >> 8), + struct.pack(FMT_8, value & 0xff)] + elif value < 1073741824: + value = value | 0xc0000000 + return [struct.pack(FMT_32, value)] + else: + raise ValueError('value too large to encode: %r' % (value,)) + def _write_packet(self, test_id=None, test_status=None, test_tags=None, runnable=True, file_name=None, file_bytes=None, eof=False, mime_type=None, route_code=None, timestamp=None): packet = [SIGNATURE] packet.append(b'FF') # placeholder for flags - packet.append(b'FFF') # placeholder for length + # placeholder for length, but see below as length is variable. + packet.append(b'') flags = 0x2000 # Version 0x2 - if route_code is not None: - flags = flags | FLAG_ROUTE_CODE - self._write_utf8(route_code, packet) if timestamp is not None: flags = flags | FLAG_TIMESTAMP since_epoch = timestamp - EPOCH nanoseconds = since_epoch.microseconds * 1000 seconds = (since_epoch.seconds + since_epoch.days * 24 * 3600) - packet.append(struct.pack(FMT_TIMESTAMP, seconds, nanoseconds)) + packet.append(struct.pack(FMT_32, seconds)) + self._write_number(nanoseconds, packet) if test_id is not None: flags = flags | FLAG_TEST_ID self._write_utf8(test_id, packet) if test_tags: flags = flags | FLAG_TAGS - self._write_len16(len(test_tags), packet) + self._write_number(len(test_tags), packet) for tag in test_tags: self._write_utf8(tag, packet) if runnable: @@ -133,15 +157,34 @@ class StreamResultToBytes(object): if file_name is not None: flags = flags | FLAG_FILE_CONTENT self._write_utf8(file_name, packet) + self._write_number(len(file_bytes), packet) packet.append(file_bytes) if eof: flags = flags | FLAG_EOF + if route_code is not None: + flags = flags | FLAG_ROUTE_CODE + self._write_utf8(route_code, packet) # 0x0008 - not used in v2. flags = flags | self.status_mask[test_status] packet[1] = struct.pack(FMT_16, flags) - length = struct.pack(FMT_32, sum(map(len, packet)) + 4) - assert length[0] == self.zero_b - packet[2] = length[1:] + base_length = sum(map(len, packet)) + 4 + if base_length <= 62: + # one byte to encode length, 62+1 = 63 + length_length = 1 + elif base_length <= 16381: + # two bytes to encode length, 16381+2 = 16383 + length_length = 2 + elif base_length <= 4194300: + # three bytes to encode length, 419430+3=4194303 + length_length = 3 + else: + # Longer than policy: + # TODO: chunk the packet automatically? + # - strip all but file data + # - do 4M chunks of that till done + # - include original data in final chunk. + raise ValueError("Length too long: %r" % base_length) + packet[2:3] = self._encode_number(base_length + length_length) # We could either do a partial application of crc32 over each chunk # or a single join to a temp variable then a final join # or two writes (that python might then split). @@ -193,124 +236,200 @@ class ByteStreamToStreamResult(object): """ self.non_subunit_name = non_subunit_name self.source = subunit.make_stream_binary(source) + self.codec = codecs.lookup('utf8').incrementaldecoder() def run(self, result): """Parse source and emit events to result. This is a blocking call: it will run until EOF is detected on source. """ + self.codec.reset() + mid_character = False while True: + # We're in blocking mode; read one char content = self.source.read(1) if not content: # EOF return - if content[0] != SIGNATURE[0]: - # Not subunit. - if self.non_subunit_name is None: - raise Exception("Non subunit content", content) - # Aggregate all content that is not subunit until either - # 1MiB is accumulated or 50ms has passed with no input. - # Both are arbitrary amounts intended to give a simple - # balance between efficiency (avoiding death by a thousand - # one-byte packets), buffering (avoiding overlarge state - # being hidden on intermediary nodes) and interactivity - # (when driving a debugger, slow response to typing is - # annoying). - buffered = [content] - while len(buffered[-1]): - try: - self.source.fileno() - except: - # Won't be able to select, fallback to - # one-byte-at-a-time. + if not mid_character and content[0] == SIGNATURE[0]: + self._parse_packet(result) + continue + if self.non_subunit_name is None: + raise Exception("Non subunit content", content) + try: + if self.codec.decode(content): + # End of a character + mid_character = False + else: + mid_character = True + except UnicodeDecodeError: + # Bad unicode, not our concern. + mid_character = False + # Aggregate all content that is not subunit until either + # 1MiB is accumulated or 50ms has passed with no input. + # Both are arbitrary amounts intended to give a simple + # balance between efficiency (avoiding death by a thousand + # one-byte packets), buffering (avoiding overlarge state + # being hidden on intermediary nodes) and interactivity + # (when driving a debugger, slow response to typing is + # annoying). + buffered = [content] + while len(buffered[-1]): + try: + self.source.fileno() + except: + # Won't be able to select, fallback to + # one-byte-at-a-time. + break + # Note: this has a very low timeout because with stdin, the + # BufferedIO layer typically has all the content available + # from the stream when e.g. pdb is dropped into, leading to + # select always timing out when in fact we could have read + # (from the buffer layer) - we typically fail to aggregate + # any content on 3.x Pythons. + readable = select.select([self.source], [], [], 0.000001)[0] + if readable: + content = self.source.read(1) + if not len(content): + # EOF - break and emit buffered. break - # Note: this has a very low timeout because with stdin, the - # BufferedIO layer typically has all the content available - # from the stream when e.g. pdb is dropped into, leading to - # select always timing out when in fact we could have read - # (from the buffer layer) - we typically fail to aggregate - # any content on 3.x Pythons. - readable = select.select([self.source], [], [], 0.000001)[0] - if readable: - content = self.source.read(1) - if len(content) and content[0] != SIGNATURE[0]: - buffered.append(content) - else: - # EOF or we hit a new packet. - break - if not readable or len(buffered) >= 1048576: + if not mid_character and content[0] == SIGNATURE[0]: + # New packet, break, emit buffered, then parse. break - result.status( - file_name=self.non_subunit_name, - file_bytes=b''.join(buffered)) - if not len(content) or content[0] != SIGNATURE[0]: - continue - # Fall through to process the packet whose first byte is in - # content. - try: - packet = [SIGNATURE] - self._parse(packet, result) - except ParseError as error: - result.status(test_id="subunit.parser", eof=True, - file_name="Packet data", file_bytes=b''.join(packet)) - result.status(test_id="subunit.parser", test_status='fail', - eof=True, file_name="Parser Error", - file_bytes=(error.args[0]).encode('utf8')) + buffered.append(content) + # Feed into the codec. + try: + if self.codec.decode(content): + # End of a character + mid_character = False + else: + mid_character = True + except UnicodeDecodeError: + # Bad unicode, not our concern. + mid_character = False + if not readable or len(buffered) >= 1048576: + # timeout or too much data, emit what we have. + break + result.status( + file_name=self.non_subunit_name, + file_bytes=b''.join(buffered)) + if mid_character or not len(content) or content[0] != SIGNATURE[0]: + continue + # Otherwise, parse a data packet. + self._parse_packet(result) + + def _parse_packet(self, result): + try: + packet = [SIGNATURE] + self._parse(packet, result) + except ParseError as error: + result.status(test_id="subunit.parser", eof=True, + file_name="Packet data", file_bytes=b''.join(packet)) + result.status(test_id="subunit.parser", test_status='fail', + eof=True, file_name="Parser Error", + file_bytes=(error.args[0]).encode('utf8')) + + def _parse_varint(self, data, pos, max_3_bytes=False): + # because the only incremental IO we do is at the start, and the 32 bit + # CRC means we can always safely read enough to cover any varint, we + # can be sure that there should be enough data - and if not it is an + # error not a normal situation. + data_0 = struct.unpack(FMT_8, data[pos:pos+1])[0] + typeenum = data_0 & 0xc0 + value_0 = data_0 & 0x3f + if typeenum == 0x00: + return value_0, 1 + elif typeenum == 0x40: + data_1 = struct.unpack(FMT_8, data[pos+1:pos+2])[0] + return (value_0 << 8) | data_1, 2 + elif typeenum == 0x80: + data_1 = struct.unpack(FMT_16, data[pos+1:pos+3])[0] + return (value_0 << 16) | data_1, 3 + else: + if max_3_bytes: + raise ParseError('3 byte maximum given but 4 byte value found.') + data_1, data_2 = struct.unpack(FMT_24, data[pos+1:pos+4]) + result = (value_0 << 24) | data_1 << 8 | data_2 + return result, 4 def _parse(self, packet, result): - packet.append(self.source.read(5)) # 2 bytes flags, 3 bytes length. + # 2 bytes flags, at most 3 bytes length. + packet.append(self.source.read(5)) flags = struct.unpack(FMT_16, packet[-1][:2])[0] - length = struct.unpack(FMT_32, packet[-1][1:])[0] & 0x00ffffff - packet.append(self.source.read(length - 6)) - if len(packet[-1]) != length - 6: + length, consumed = self._parse_varint( + packet[-1], 2, max_3_bytes=True) + remainder = self.source.read(length - 6) + if len(remainder) != length - 6: raise ParseError( 'Short read - got %d bytes, wanted %d bytes' % ( - len(packet[-1]), length - 6)) + len(remainder), length - 6)) + if consumed != 3: + # Avoid having to parse torn values + packet[-1] += remainder + pos = 2 + consumed + else: + # Avoid copying potentially lots of data. + packet.append(remainder) + pos = 0 crc = zlib.crc32(packet[0]) - crc = zlib.crc32(packet[1], crc) - crc = zlib.crc32(packet[2][:-4], crc) & 0xffffffff - packet_crc = struct.unpack(FMT_32, packet[2][-4:])[0] + for fragment in packet[1:-1]: + crc = zlib.crc32(fragment, crc) + crc = zlib.crc32(packet[-1][:-4], crc) & 0xffffffff + packet_crc = struct.unpack(FMT_32, packet[-1][-4:])[0] if crc != packet_crc: # Bad CRC, report it and stop parsing the packet. raise ParseError( 'Bad checksum - calculated (0x%x), stored (0x%x)' % (crc, packet_crc)) + if safe_hasattr(__builtins__, 'memoryview'): + body = memoryview(packet[-1]) + else: + body = packet[-1] + # Discard CRC-32 + body = body[:-4] # One packet could have both file and status data; the Python API # presents these separately (perhaps it shouldn't?) - pos = 0 - if flags & FLAG_ROUTE_CODE: - route_code, pos = self._read_utf8(packet[2], pos) - else: - route_code = None if flags & FLAG_TIMESTAMP: - seconds, nanoseconds = struct.unpack(FMT_TIMESTAMP, packet[2][pos:pos+8]) - pos += 8 + seconds = struct.unpack(FMT_32, body[pos:pos+4])[0] + nanoseconds, consumed = self._parse_varint(body, pos+4) + pos = pos + 4 + consumed timestamp = EPOCH + datetime.timedelta( seconds=seconds, microseconds=nanoseconds/1000) else: timestamp = None if flags & FLAG_TEST_ID: - test_id, pos = self._read_utf8(packet[2], pos) + test_id, pos = self._read_utf8(body, pos) else: test_id = None if flags & FLAG_TAGS: - tag_count, pos = self._read_len16(packet[2], pos) + tag_count, consumed = self._parse_varint(body, pos) + pos += consumed test_tags = set() for _ in range(tag_count): - tag, pos = self._read_utf8(packet[2], pos) + tag, pos = self._read_utf8(body, pos) test_tags.add(tag) else: test_tags = None if flags & FLAG_MIME_TYPE: - mime_type, pos = self._read_utf8(packet[2], pos) + mime_type, pos = self._read_utf8(body, pos) else: mime_type = None if flags & FLAG_FILE_CONTENT: - file_name, pos = self._read_utf8(packet[2], pos) - file_bytes = packet[2][pos:-4] + file_name, pos = self._read_utf8(body, pos) + content_length, consumed = self._parse_varint(body, pos) + pos += consumed + file_bytes = body[pos:pos+content_length] + if len(file_bytes) != content_length: + raise ParseError('File content extends past end of packet: ' + 'claimed %d bytes, %d available' % ( + content_length, len(file_bytes))) else: file_name = None file_bytes = None + if flags & FLAG_ROUTE_CODE: + route_code, pos = self._read_utf8(body, pos) + else: + route_code = None runnable = bool(flags & FLAG_RUNNABLE) eof = bool(flags & FLAG_EOF) test_status = self.status_lookup[flags & 0x0007] @@ -320,12 +439,9 @@ class ByteStreamToStreamResult(object): route_code=route_code, timestamp=timestamp) __call__ = run - def _read_len16(self, buf, pos): - length = struct.unpack(FMT_16, buf[pos:pos+2])[0] - return length, pos + 2 - def _read_utf8(self, buf, pos): - length, pos = self._read_len16(buf, pos) + length, consumed = self._parse_varint(buf, pos) + pos += consumed utf8_bytes = buf[pos:pos+length] if length != len(utf8_bytes): raise ParseError( |
