summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py39
1 files changed, 32 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 4deec49..29db48e 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -17,11 +17,20 @@ except ImportError:
snappy = None
try:
+ import lz4.frame as lz4
+except ImportError:
+ lz4 = None
+
+try:
import lz4f
- import xxhash
except ImportError:
lz4f = None
+try:
+ import xxhash
+except ImportError:
+ xxhash = None
+
PYPY = bool(platform.python_implementation() == 'PyPy')
def has_gzip():
@@ -33,7 +42,11 @@ def has_snappy():
def has_lz4():
- return lz4f is not None
+ if lz4 is not None:
+ return True
+ if lz4f is not None:
+ return True
+ return False
def gzip_encode(payload, compresslevel=None):
@@ -181,13 +194,15 @@ def snappy_decode(payload):
return snappy.decompress(payload)
-def lz4_encode(payload):
- """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
- # pylint: disable-msg=no-member
- return lz4f.compressFrame(payload)
+if lz4:
+ lz4_encode = lz4.compress # pylint: disable-msg=no-member
+elif lz4f:
+ lz4_encode = lz4f.compressFrame # pylint: disable-msg=no-member
+else:
+ lz4_encode = None
-def lz4_decode(payload):
+def lz4f_decode(payload):
"""Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
# pylint: disable-msg=no-member
ctx = lz4f.createDecompContext()
@@ -201,8 +216,17 @@ def lz4_decode(payload):
return data['decomp']
+if lz4:
+ lz4_decode = lz4.decompress # pylint: disable-msg=no-member
+elif lz4f:
+ lz4_decode = lz4f_decode
+else:
+ lz4_decode = None
+
+
def lz4_encode_old_kafka(payload):
"""Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
+ assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
@@ -224,6 +248,7 @@ def lz4_encode_old_kafka(payload):
def lz4_decode_old_kafka(payload):
+ assert xxhash is not None
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):