diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-26 20:35:22 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-26 21:51:29 -0800 |
commit | f08ec792ee93fd059e81ee1e30f5651c15f69e85 (patch) | |
tree | 6698c6b28681d35edbfe5ca9ff385c8664f33ee1 /kafka/codec.py | |
parent | 66284e57accec5977d606fc91a0b28177b352eb4 (diff) | |
download | kafka-python-lz4_fixup.tar.gz |
Handle broken LZ4 framing; switch to lz4tools + xxhashlz4_fixup
Diffstat (limited to 'kafka/codec.py')
-rw-r--r-- | kafka/codec.py | 58 |
1 files changed, 51 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index 11d5a99..e94bc4c 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -15,13 +15,10 @@ except ImportError: snappy = None try: - import lz4 - from lz4 import compress as lz4_encode - from lz4 import decompress as lz4_decode + import lz4f + import xxhash except ImportError: - lz4 = None - lz4_encode = None - lz4_decode = None + lz4f = None PYPY = bool(platform.python_implementation() == 'PyPy') @@ -34,7 +31,7 @@ def has_snappy(): def has_lz4(): - return lz4 is not None + return lz4f is not None def gzip_encode(payload, compresslevel=None): @@ -180,3 +177,50 @@ def snappy_decode(payload): return out.read() else: return snappy.decompress(payload) + + +def lz4_encode(payload): + data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(data[4], int): + flg = data[4] + else: + flg = ord(data[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This is the incorrect hc + hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + return b''.join([ + data[0:header_size-1], + hc, + data[header_size:] + ]) + + +def lz4_decode(payload): + # Kafka's LZ4 code has a bug in its header checksum implementation + header_size = 7 + if isinstance(payload[4], int): + flg = payload[4] + else: + flg = ord(payload[4]) + content_size_bit = ((flg >> 3) & 1) + if content_size_bit: + header_size += 8 + + # This should be the correct hc + hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member + + munged_payload = b''.join([ + payload[0:header_size-1], + hc, + payload[header_size:] + ]) + + cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member + data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member + return data['decomp'] |