summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py30
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index e94bc4c..9c31e9d 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -180,8 +180,27 @@ def snappy_decode(payload):
def lz4_encode(payload):
- data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member
- # Kafka's LZ4 code has a bug in its header checksum implementation
+ """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
+ # pylint: disable-msg=no-member
+ return lz4f.compressFrame(payload)
+
+
+def lz4_decode(payload):
+ """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10"""
+ # pylint: disable-msg=no-member
+ ctx = lz4f.createDecompContext()
+ data = lz4f.decompressFrame(payload, ctx)
+
+ # lz4f python module does not expose how much of the payload was
+ # actually read if the decompression was only partial.
+ if data['next'] != 0:
+ raise RuntimeError('lz4f unable to decompress full payload')
+ return data['decomp']
+
+
+def lz4_encode_old_kafka(payload):
+ """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum."""
+ data = lz4_encode(payload)
header_size = 7
if isinstance(data[4], int):
flg = data[4]
@@ -201,7 +220,7 @@ def lz4_encode(payload):
])
-def lz4_decode(payload):
+def lz4_decode_old_kafka(payload):
# Kafka's LZ4 code has a bug in its header checksum implementation
header_size = 7
if isinstance(payload[4], int):
@@ -220,7 +239,4 @@ def lz4_decode(payload):
hc,
payload[header_size:]
])
-
- cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member
- data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member
- return data['decomp']
+ return lz4_decode(munged_payload)