diff options
author | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-08-25 09:38:27 +0000 |
---|---|---|
committer | Taras Voinarovskiy <voyn1991@gmail.com> | 2017-10-24 15:12:11 +0000 |
commit | 3af66bc542efff3f7010bec18b72d844e09488c4 (patch) | |
tree | a20623631b36230c9425b08ee95b85afbc9a9455 /kafka/record/util.py | |
parent | e06af5343a55cf8d03e32a645ee970d872cb9ba0 (diff) | |
download | kafka-python-v2_records.tar.gz |
Add DefaultRecordBatch implementation aka V2 message format parser/builder.v2_records
Added bytecode optimization for varint and append/read_msg functions. Mostly based on avoiding LOAD_GLOBAL calls.
Diffstat (limited to 'kafka/record/util.py')
-rw-r--r-- | kafka/record/util.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/kafka/record/util.py b/kafka/record/util.py index 098d6f4..88135f1 100644 --- a/kafka/record/util.py +++ b/kafka/record/util.py @@ -1,5 +1,124 @@ import binascii +from ._crc32c import crc as crc32c_py + + +def encode_varint(value, write): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + value (int): Value to encode + write (function): Called per byte that needs to be writen + + Returns: + int: Number of bytes written + """ + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + write(value) + return 1 + if value <= 0x3fff: # 2 bytes + write(0x80 | (value & 0x7f)) + write(value >> 7) + return 2 + if value <= 0x1fffff: # 3 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(value >> 14) + return 3 + if value <= 0xfffffff: # 4 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(value >> 21) + return 4 + if value <= 0x7ffffffff: # 5 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(0x80 | ((value >> 21) & 0x7f)) + write(value >> 28) + return 5 + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + i = 0 + while value: + write(0x80 | bits) + bits = value & 0x7f + value >>= 7 + i += 1 + write(bits) + return i + + +def size_of_varint(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + if value <= 0x7f: + return 1 + if value <= 0x3fff: + return 2 + if value <= 0x1fffff: + return 3 + if value <= 0xfffffff: + return 4 + if value <= 0x7ffffffff: + return 5 + if value <= 0x3ffffffffff: + return 6 + if value <= 0x1ffffffffffff: + return 7 + if value <= 0xffffffffffffff: + return 8 + if value <= 0x7fffffffffffffff: + return 9 + return 10 + + +def decode_varint(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytearry): buffer to read from. + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + result = buffer[pos] + if not (result & 0x81): + return (result >> 1), pos + 1 + if not (result & 0x80): + return (result >> 1) ^ (~0), pos + 1 + + result &= 0x7f + pos += 1 + shift = 7 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +def calc_crc32c(memview): + """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data + """ + crc = crc32c_py(memview) + return crc + def calc_crc32(memview): """ Calculate simple CRC-32 checksum over a memoryview of data |