summaryrefslogtreecommitdiff
path: root/kafka/producer/buffer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r--kafka/producer/buffer.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index b2ac747..ba9b5db 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -29,7 +29,7 @@ class MessageSetBuffer(object):
'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY),
'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4),
}
- def __init__(self, buf, batch_size, compression_type=None):
+ def __init__(self, buf, batch_size, compression_type=None, message_version=0):
if compression_type is not None:
assert compression_type in self._COMPRESSORS, 'Unrecognized compression type'
checker, encoder, attributes = self._COMPRESSORS[compression_type]
@@ -40,6 +40,7 @@ class MessageSetBuffer(object):
self._compressor = None
self._compression_attributes = None
+ self._message_version = message_version
self._buffer = buf
# Init MessageSetSize to 0 -- update on close
self._buffer.seek(0)
@@ -85,7 +86,8 @@ class MessageSetBuffer(object):
# TODO: avoid copies with bytearray / memoryview
self._buffer.seek(4)
msg = Message(self._compressor(self._buffer.read()),
- attributes=self._compression_attributes)
+ attributes=self._compression_attributes,
+ magic=self._message_version)
encoded = msg.encode()
self._buffer.seek(4)
self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg