summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/codec.py58
1 files changed, 51 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 11d5a99..e94bc4c 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -15,13 +15,10 @@ except ImportError:
snappy = None
try:
- import lz4
- from lz4 import compress as lz4_encode
- from lz4 import decompress as lz4_decode
+ import lz4f
+ import xxhash
except ImportError:
- lz4 = None
- lz4_encode = None
- lz4_decode = None
+ lz4f = None
PYPY = bool(platform.python_implementation() == 'PyPy')
@@ -34,7 +31,7 @@ def has_snappy():
def has_lz4():
- return lz4 is not None
+ return lz4f is not None
def gzip_encode(payload, compresslevel=None):
@@ -180,3 +177,50 @@ def snappy_decode(payload):
return out.read()
else:
return snappy.decompress(payload)
+
+
+def lz4_encode(payload):
+ data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(data[4], int):
+ flg = data[4]
+ else:
+ flg = ord(data[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This is the incorrect hc
+ hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ return b''.join([
+ data[0:header_size-1],
+ hc,
+ data[header_size:]
+ ])
+
+
+def lz4_decode(payload):
+ # Kafka's LZ4 code has a bug in its header checksum implementation
+ header_size = 7
+ if isinstance(payload[4], int):
+ flg = payload[4]
+ else:
+ flg = ord(payload[4])
+ content_size_bit = ((flg >> 3) & 1)
+ if content_size_bit:
+ header_size += 8
+
+ # This should be the correct hc
+ hc = xxhash.xxh32(payload[4:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
+
+ munged_payload = b''.join([
+ payload[0:header_size-1],
+ hc,
+ payload[header_size:]
+ ])
+
+ cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
+ data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
+ return data['decomp']