diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-25 09:38:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-10-24 15:12:11 +0000 |
commit | 3af66bc542efff3f7010bec18b72d844e09488c4 (patch) | |
tree | a20623631b36230c9425b08ee95b85afbc9a9455 /benchmarks/varint_speed.py | |
parent | e06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff) | |
download | kafka-python-v2_records.tar.gz |
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'benchmarks/varint_speed.py')
-rw-r--r-- | benchmarks/varint_speed.py | 443 |
1 files changed, 443 insertions, 0 deletions
diff --git a/benchmarks/varint_speed.py b/benchmarks/varint_speed.py new file mode 100644 index 0000000..2c5cd62 --- /dev/null +++ b/benchmarks/varint_speed.py @@ -0,0 +1,443 @@ +#!/usr/bin/env python +from __future__ import print_function +import perf +import six + + +test_data = [ + (b"\x00", 0), + (b"\x01", -1), + (b"\x02", 1), + (b"\x7E", 63), + (b"\x7F", -64), + (b"\x80\x01", 64), + (b"\x81\x01", -65), + (b"\xFE\x7F", 8191), + (b"\xFF\x7F", -8192), + (b"\x80\x80\x01", 8192), + (b"\x81\x80\x01", -8193), + (b"\xFE\xFF\x7F", 1048575), + (b"\xFF\xFF\x7F", -1048576), + (b"\x80\x80\x80\x01", 1048576), + (b"\x81\x80\x80\x01", -1048577), + (b"\xFE\xFF\xFF\x7F", 134217727), + (b"\xFF\xFF\xFF\x7F", -134217728), + (b"\x80\x80\x80\x80\x01", 134217728), + (b"\x81\x80\x80\x80\x01", -134217729), + (b"\xFE\xFF\xFF\xFF\x7F", 17179869183), + (b"\xFF\xFF\xFF\xFF\x7F", -17179869184), + (b"\x80\x80\x80\x80\x80\x01", 17179869184), + (b"\x81\x80\x80\x80\x80\x01", -17179869185), + (b"\xFE\xFF\xFF\xFF\xFF\x7F", 2199023255551), + (b"\xFF\xFF\xFF\xFF\xFF\x7F", -2199023255552), + (b"\x80\x80\x80\x80\x80\x80\x01", 2199023255552), + (b"\x81\x80\x80\x80\x80\x80\x01", -2199023255553), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\x7F", 281474976710655), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -281474976710656), + (b"\x80\x80\x80\x80\x80\x80\x80\x01", 281474976710656), + (b"\x81\x80\x80\x80\x80\x80\x80\x01", -281474976710657), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 36028797018963967), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -36028797018963968), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x01", 36028797018963968), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x01", -36028797018963969), + (b"\xFE\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", 4611686018427387903), + (b"\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFF\x7F", -4611686018427387904), + (b"\x80\x80\x80\x80\x80\x80\x80\x80\x80\x01", 4611686018427387904), + (b"\x81\x80\x80\x80\x80\x80\x80\x80\x80\x01", -4611686018427387905), +] + + +BENCH_VALUES_ENC = [ + 60, # 1 byte + -8192, # 2 bytes + 1048575, # 3 bytes + 134217727, # 4 bytes + -17179869184, # 5 bytes + 2199023255551, # 6 bytes +] + +BENCH_VALUES_DEC = [ + b"\x7E", # 1 byte + b"\xFF\x7F", # 2 bytes + b"\xFE\xFF\x7F", # 3 bytes + b"\xFF\xFF\xFF\x7F", # 4 bytes + b"\x80\x80\x80\x80\x01", # 5 bytes + b"\xFE\xFF\xFF\xFF\xFF\x7F", # 6 bytes +] +BENCH_VALUES_DEC = list(map(bytearray, BENCH_VALUES_DEC)) + + +def _assert_valid_enc(enc_func): + for encoded, decoded in test_data: + assert enc_func(decoded) == encoded, decoded + + +def _assert_valid_dec(dec_func): + for encoded, decoded in test_data: + res, pos = dec_func(bytearray(encoded)) + assert res == decoded, (decoded, res) + assert pos == len(encoded), (decoded, pos) + + +def _assert_valid_size(size_func): + for encoded, decoded in test_data: + assert size_func(decoded) == len(encoded), decoded + + +def encode_varint_1(num): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + num (int): Value to encode + + Returns: + bytearray: Encoded presentation of integer with length from 1 to 10 + bytes + """ + # Shift sign to the end of number + num = (num << 1) ^ (num >> 63) + # Max 10 bytes. We assert those are allocated + buf = bytearray(10) + + for i in range(10): + # 7 lowest bits from the number and set 8th if we still have pending + # bits left to encode + buf[i] = num & 0x7f | (0x80 if num > 0x7f else 0) + num = num >> 7 + if num == 0: + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + return buf[:i + 1] + +_assert_valid_enc(encode_varint_1) + + +def encode_varint_2(value, int2byte=six.int2byte): + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + res = b"" + while value: + res += int2byte(0x80 | bits) + bits = value & 0x7f + value >>= 7 + return res + int2byte(bits) + +_assert_valid_enc(encode_varint_2) + + +def encode_varint_3(value, buf): + append = buf.append + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + while value: + append(0x80 | bits) + bits = value & 0x7f + value >>= 7 + append(bits) + return value + + +for encoded, decoded in test_data: + res = bytearray() + encode_varint_3(decoded, res) + assert res == encoded + + +def encode_varint_4(value, int2byte=six.int2byte): + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + return int2byte(value) + if value <= 0x3fff: # 2 bytes + return int2byte(0x80 | (value & 0x7f)) + int2byte(value >> 7) + if value <= 0x1fffff: # 3 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(value >> 14) + if value <= 0xfffffff: # 4 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(0x80 | ((value >> 14) & 0x7f)) + \ + int2byte(value >> 21) + if value <= 0x7ffffffff: # 5 bytes + return int2byte(0x80 | (value & 0x7f)) + \ + int2byte(0x80 | ((value >> 7) & 0x7f)) + \ + int2byte(0x80 | ((value >> 14) & 0x7f)) + \ + int2byte(0x80 | ((value >> 21) & 0x7f)) + \ + int2byte(value >> 28) + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + res = b"" + while value: + res += int2byte(0x80 | bits) + bits = value & 0x7f + value >>= 7 + return res + int2byte(bits) + + +_assert_valid_enc(encode_varint_4) + +# import dis +# dis.dis(encode_varint_4) + + +def encode_varint_5(value, buf, pos=0): + value = (value << 1) ^ (value >> 63) + + bits = value & 0x7f + value >>= 7 + while value: + buf[pos] = 0x80 | bits + bits = value & 0x7f + value >>= 7 + pos += 1 + buf[pos] = bits + return pos + 1 + +for encoded, decoded in test_data: + res = bytearray(10) + written = encode_varint_5(decoded, res) + assert res[:written] == encoded + + +def encode_varint_6(value, buf): + append = buf.append + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + append(value) + return 1 + if value <= 0x3fff: # 2 bytes + append(0x80 | (value & 0x7f)) + append(value >> 7) + return 2 + if value <= 0x1fffff: # 3 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(value >> 14) + return 3 + if value <= 0xfffffff: # 4 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(0x80 | ((value >> 14) & 0x7f)) + append(value >> 21) + return 4 + if value <= 0x7ffffffff: # 5 bytes + append(0x80 | (value & 0x7f)) + append(0x80 | ((value >> 7) & 0x7f)) + append(0x80 | ((value >> 14) & 0x7f)) + append(0x80 | ((value >> 21) & 0x7f)) + append(value >> 28) + return 5 + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + i = 0 + while value: + append(0x80 | bits) + bits = value & 0x7f + value >>= 7 + i += 1 + append(bits) + return i + + +for encoded, decoded in test_data: + res = bytearray() + encode_varint_6(decoded, res) + assert res == encoded + + +def size_of_varint_1(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + res = 0 + while True: + res += 1 + value = value >> 7 + if value == 0: + break + return res + +_assert_valid_size(size_of_varint_1) + + +def size_of_varint_2(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + if value <= 0x7f: + return 1 + if value <= 0x3fff: + return 2 + if value <= 0x1fffff: + return 3 + if value <= 0xfffffff: + return 4 + if value <= 0x7ffffffff: + return 5 + if value <= 0x3ffffffffff: + return 6 + if value <= 0x1ffffffffffff: + return 7 + if value <= 0xffffffffffffff: + return 8 + if value <= 0x7fffffffffffffff: + return 9 + return 10 + +_assert_valid_size(size_of_varint_2) + + +if six.PY3: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return memview[pos] +else: + def _read_byte(memview, pos): + """ Read a byte from memoryview as an integer + + Raises: + IndexError: if position is out of bounds + """ + return ord(memview[pos]) + + +def decode_varint_1(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytes-like): any object acceptable by ``memoryview`` + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + value = 0 + shift = 0 + memview = memoryview(buffer) + for i in range(pos, pos + 10): + try: + byte = _read_byte(memview, i) + except IndexError: + raise ValueError("End of byte stream") + if byte & 0x80 != 0: + value |= (byte & 0x7f) << shift + shift += 7 + else: + value |= byte << shift + break + else: + # Max size of endcoded double is 10 bytes for unsigned values + raise ValueError("Out of double range") + # Normalize sign + return (value >> 1) ^ -(value & 1), i + 1 + +_assert_valid_dec(decode_varint_1) + + +def decode_varint_2(buffer, pos=0): + result = 0 + shift = 0 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + # result = result_type(() & mask) + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +_assert_valid_dec(decode_varint_2) + + +def decode_varint_3(buffer, pos=0): + result = buffer[pos] + if not (result & 0x81): + return (result >> 1), pos + 1 + if not (result & 0x80): + return (result >> 1) ^ (~0), pos + 1 + + result &= 0x7f + pos += 1 + shift = 7 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +_assert_valid_dec(decode_varint_3) + +# import dis +# dis.dis(decode_varint_3) + +runner = perf.Runner() +# Encode algorithms returning a bytes result +for bench_func in [ + encode_varint_1, + encode_varint_2, + encode_varint_4]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + +# Encode algorithms writing to the buffer +for bench_func in [ + encode_varint_3, + encode_varint_5, + encode_varint_6]: + for i, value in enumerate(BENCH_VALUES_ENC): + fname = bench_func.__name__ + runner.timeit( + '{}_{}byte'.format(fname, i + 1), + stmt="{}({}, buffer)".format(fname, value), + setup="from __main__ import {}; buffer = bytearray(10)".format( + fname) + ) + +# Size algorithms +for bench_func in [ + size_of_varint_1, + size_of_varint_2]: + for i, value in enumerate(BENCH_VALUES_ENC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) + +# Decode algorithms +for bench_func in [ + decode_varint_1, + decode_varint_2, + decode_varint_3]: + for i, value in enumerate(BENCH_VALUES_DEC): + runner.bench_func( + '{}_{}byte'.format(bench_func.__name__, i + 1), + bench_func, value) |