diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
commit | 1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (patch) | |
tree | 0393a7b7768f449174216fb6b0702a87844a5a81 /kafka/codec.py | |
parent | 96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff) | |
download | kafka-python-1d4251a9efa4c5466ba5095f3ba199bf082a72b5.tar.gz |
Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility
* lz4f does not support easy incremental decompression - raise RuntimeError
* Update lz4 codec tests
Diffstat (limited to 'kafka/codec.py')
-rw-r--r-- | kafka/codec.py | 30 |
1 files changed, 23 insertions, 7 deletions
diff --git a/kafka/codec.py b/kafka/codec.py index e94bc4c..9c31e9d 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -180,8 +180,27 @@ def snappy_decode(payload): def lz4_encode(payload): - data = lz4f.compressFrame(payload) # pylint: disable-msg=no-member - # Kafka's LZ4 code has a bug in its header checksum implementation + """Encode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" + # pylint: disable-msg=no-member + return lz4f.compressFrame(payload) + + +def lz4_decode(payload): + """Decode payload using interoperable LZ4 framing. Requires Kafka >= 0.10""" + # pylint: disable-msg=no-member + ctx = lz4f.createDecompContext() + data = lz4f.decompressFrame(payload, ctx) + + # lz4f python module does not expose how much of the payload was + # actually read if the decompression was only partial. + if data['next'] != 0: + raise RuntimeError('lz4f unable to decompress full payload') + return data['decomp'] + + +def lz4_encode_old_kafka(payload): + """Encode payload for 0.8/0.9 brokers -- requires an incorrect header checksum.""" + data = lz4_encode(payload) header_size = 7 if isinstance(data[4], int): flg = data[4] @@ -201,7 +220,7 @@ def lz4_encode(payload): ]) -def lz4_decode(payload): +def lz4_decode_old_kafka(payload): # Kafka's LZ4 code has a bug in its header checksum implementation header_size = 7 if isinstance(payload[4], int): @@ -220,7 +239,4 @@ def lz4_decode(payload): hc, payload[header_size:] ]) - - cCtx = lz4f.createCompContext() # pylint: disable-msg=no-member - data = lz4f.decompressFrame(munged_payload, cCtx) # pylint: disable-msg=no-member - return data['decomp'] + return lz4_decode(munged_payload) |