summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-03-14 11:01:58 -0700
committerDana Powers <dana.powers@gmail.com>2017-03-14 11:38:18 -0700
commit9ba21b836731af258afb8c400f40110b3ff58ebc (patch)
tree140a7fcab79930c4eb735dca6cc2dc7161d8570b
parent43820dfb93712b5721505c4e6dee542cd73bfca9 (diff)
downloadkafka-python-9ba21b836731af258afb8c400f40110b3ff58ebc.tar.gz
LZ4 support in kafka 0.8/0.9 does not accept a ContentSize header
-rw-r--r--kafka/codec.py20
1 files changed, 14 insertions, 6 deletions
diff --git a/kafka/codec.py b/kafka/codec.py
index 29db48e..a527b42 100644
--- a/kafka/codec.py
+++ b/kafka/codec.py
@@ -229,13 +229,21 @@ def lz4_encode_old_kafka(payload):
assert xxhash is not None
data = lz4_encode(payload)
header_size = 7
- if isinstance(data[4], int):
- flg = data[4]
- else:
- flg = ord(data[4])
+ flg = data[4]
+ if not isinstance(flg, int):
+ flg = ord(flg)
+
content_size_bit = ((flg >> 3) & 1)
if content_size_bit:
- header_size += 8
+ # Old kafka does not accept the content-size field
+ # so we need to discard it and reset the header flag
+ flg -= 8
+ data = bytearray(data)
+ data[4] = flg
+ data = bytes(data)
+ payload = data[header_size+8:]
+ else:
+ payload = data[header_size:]
# This is the incorrect hc
hc = xxhash.xxh32(data[0:header_size-1]).digest()[-2:-1] # pylint: disable-msg=no-member
@@ -243,7 +251,7 @@ def lz4_encode_old_kafka(payload):
return b''.join([
data[0:header_size-1],
hc,
- data[header_size:]
+ payload
])