diff options
Diffstat (limited to 'kafka/record/util.py')
-rw-r--r-- | kafka/record/util.py | 119 |
1 files changed, 119 insertions, 0 deletions
diff --git a/kafka/record/util.py b/kafka/record/util.py index 098d6f4..88135f1 100644 --- a/kafka/record/util.py +++ b/kafka/record/util.py @@ -1,5 +1,124 @@ import binascii +from ._crc32c import crc as crc32c_py + + +def encode_varint(value, write): + """ Encode an integer to a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + value (int): Value to encode + write (function): Called per byte that needs to be writen + + Returns: + int: Number of bytes written + """ + value = (value << 1) ^ (value >> 63) + + if value <= 0x7f: # 1 byte + write(value) + return 1 + if value <= 0x3fff: # 2 bytes + write(0x80 | (value & 0x7f)) + write(value >> 7) + return 2 + if value <= 0x1fffff: # 3 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(value >> 14) + return 3 + if value <= 0xfffffff: # 4 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(value >> 21) + return 4 + if value <= 0x7ffffffff: # 5 bytes + write(0x80 | (value & 0x7f)) + write(0x80 | ((value >> 7) & 0x7f)) + write(0x80 | ((value >> 14) & 0x7f)) + write(0x80 | ((value >> 21) & 0x7f)) + write(value >> 28) + return 5 + else: + # Return to general algorithm + bits = value & 0x7f + value >>= 7 + i = 0 + while value: + write(0x80 | bits) + bits = value & 0x7f + value >>= 7 + i += 1 + write(bits) + return i + + +def size_of_varint(value): + """ Number of bytes needed to encode an integer in variable-length format. + """ + value = (value << 1) ^ (value >> 63) + if value <= 0x7f: + return 1 + if value <= 0x3fff: + return 2 + if value <= 0x1fffff: + return 3 + if value <= 0xfffffff: + return 4 + if value <= 0x7ffffffff: + return 5 + if value <= 0x3ffffffffff: + return 6 + if value <= 0x1ffffffffffff: + return 7 + if value <= 0xffffffffffffff: + return 8 + if value <= 0x7fffffffffffffff: + return 9 + return 10 + + +def decode_varint(buffer, pos=0): + """ Decode an integer from a varint presentation. See + https://developers.google.com/protocol-buffers/docs/encoding?csw=1#varints + on how those can be produced. + + Arguments: + buffer (bytearry): buffer to read from. + pos (int): optional position to read from + + Returns: + (int, int): Decoded int value and next read position + """ + result = buffer[pos] + if not (result & 0x81): + return (result >> 1), pos + 1 + if not (result & 0x80): + return (result >> 1) ^ (~0), pos + 1 + + result &= 0x7f + pos += 1 + shift = 7 + while 1: + b = buffer[pos] + result |= ((b & 0x7f) << shift) + pos += 1 + if not (b & 0x80): + return ((result >> 1) ^ -(result & 1), pos) + shift += 7 + if shift >= 64: + raise ValueError("Out of int64 range") + + +def calc_crc32c(memview): + """ Calculate CRC-32C (Castagnoli) checksum over a memoryview of data + """ + crc = crc32c_py(memview) + return crc + def calc_crc32(memview): """ Calculate simple CRC-32 checksum over a memoryview of data |