summaryrefslogtreecommitdiff
path: root/kafka/codec.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/codec.py')
-rw-r--r--kafka/codec.py20
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 29db48e..a527b42 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -229,13 +229,21 @@ def lz4_encode_old_kafka(payload):
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
- if isinstance(data[4], int):
- flg = data[4]
- else:
- flg = ord(data[4])
+ flg = data[4]
+ if not isinstance(flg, int):
+ flg = ord(flg)
+
content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
- header_size += 8
+ # Old kafka does not accept the content-size field
+ # so we need to discard it and reset the header flag
+ flg -= 8
+ data = bytearray(data)
+ data[4] = flg
+ data = bytes(data)
+ payload = data[header_size+8:]
+ else:
+ payload = data[header_size:]
# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
@@ -243,7 +251,7 @@ def lz4_encode_old_kafka(payload):
return b''.join([
data[0:header_size-1],
hc,
- data[header_size:]
+ payload
])