summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 17:14:03 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 17:14:03 -0700
commit1d4251a9efa4c5466ba5095f3ba199bf082a72b5 (patch)
tree0393a7b7768f449174216fb6b0702a87844a5a81 /kafka/producer/buffer.py
parent96530f6a9c4a31d23b069ba162dba6cf45a5efd0 (diff)
downloadkafka-python-1d4251a9efa4c5466ba5095f3ba199bf082a72b5.tar.gz
Use standard LZ4 framing for v1 messages / kafka 0.10 (#695)
* LZ4 framing fixed in 0.10 / message v1 -- retain broken lz4 code for compatibility * lz4f does not support easy incremental decompression - raise RuntimeError * Update lz4 codec tests
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index ba9b5db..5dc2e1f 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -6,7 +6,8 @@ import threading
import time
from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode, lz4_encode)
+ gzip_encode, snappy_encode,
+ lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -28,10 +29,16 @@ class MessageSetBuffer(object):
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
+ 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
+
+ # Kafka 0.8/0.9 had a quirky lz4...
+ if compression_type == 'lz4' and message_version == 0:
+ compression_type = 'lz4-old-kafka'
+
checker, encoder, attributes = self._COMPRESSORS[compression_type]
assert checker(), 'Compression Libraries Not Found'
self._compressor = encoder