summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index ba9b5db..5dc2e1f 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -6,7 +6,8 @@ import threading
import time
from ..codec import (has_gzip, has_snappy, has_lz4,
- gzip_encode, snappy_encode, lz4_encode)
+ gzip_encode, snappy_encode,
+ lz4_encode, lz4_encode_old_kafka)
from .. import errors as Errors
from ..protocol.types import Int32, Int64
from ..protocol.message import MessageSet, Message
@@ -28,10 +29,16 @@ class MessageSetBuffer(object):
'gzip': (has_gzip, gzip_encode, Message.CODEC_GZIP),
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
+ 'lz4-old-kafka': (has_lz4, lz4_encode_old_kafka, Message.CODEC_LZ4),
}
def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
+
+ # Kafka 0.8/0.9 had a quirky lz4...
+ if compression_type == 'lz4' and message_version == 0:
+ compression_type = 'lz4-old-kafka'
+
checker, encoder, attributes = self._COMPRESSORS[compression_type]
assert checker(), 'Compression Libraries Not Found'
self._compressor = encoder