diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 17:14:03 -0700 |
commit | 1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (patch) | |
tree | 0393a7b7768f449174216fb6b0702a87844a5a81 /kafka/producer/buffer.py | |
parent | 96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff) | |
download | kafka-python-1d4251a9efa4c5466ba5095f3ba199bf082a72b5.tar.gz |
Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility
* lz4f does not support easy incremental decompression - raise RuntimeError
* Update lz4 codec tests
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 9 |
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index ba9b5db..5dc2e1f 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -6,7 +6,8 @@ import threading import time from ..codec import (has_gzip, has_snappy, has_lz4, - gzip_encode, snappy_encode, lz4_encode) + gzip_encode, snappy_encode, + lz4_encode, lz4_encode_old_kafka) from .. import errors as Errors from ..protocol.types import Int32, Int64 from ..protocol.message import MessageSet, Message @@ -28,10 +29,16 @@ class MessageSetBuffer(object): 'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP), 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), + 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4), } def __init__(self, buf, batch_size, compression_type=None, message_version=0): if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' + + # Kafka 0.8/0.9 had a quirky lz4... + if compression_type == 'lz4' and message_version == 0: + compression_type = 'lz4-old-kafka' + checker, encoder, attributes = self._COMPRESSORS[compression_type] assert checker(), 'Compression Libraries Not Found' self._compressor = encoder |