diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/codec.py | 58 |
1 files changed, 51 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 11d5a99..e94bc4c 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -15,13 +15,10 @@ except ImportError: snappy = None try: - import lz4 - from lz4 import compress as lz4_encode - from lz4 import decompress as lz4_decode + import lz4f + import xxhash except ImportError: - lz4 = None - lz4_encode = None - lz4_decode = None + lz4f = None PYPY = bool(platform.python_implementation() == 'PyPy') @@ -34,7 +31,7 @@ def has_snappy(): def has_lz4(): - return lz4 is not None + return lz4f is not None def gzip_encode(payload, compresslevel=None): @@ -180,3 +177,50 @@ def snappy_decode(payload): return out.read() else: return snappy.decompress(payload) + + +def lz4_encode(payload): + data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(data[4], int): + flg = data[4] + else: + flg = ord(data[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This is the incorrect hc + hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + return b''.join([ + data[0:header_size-1], + hc, + data[header_size:] + ]) + + +def lz4_decode(payload): + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(payload[4], int): + flg = payload[4] + else: + flg = ord(payload[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This should be the correct hc + hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + munged_payload = b''.join([ + payload[0:header_size-1], + hc, + payload[header_size:] + ]) + + cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member + data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member + return data['decomp'] |