diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 10:28:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 10:28:56 -0700 |
commit | 7941a2ac7ec6663f08c6291d92746eae9f792916 (patch) | |
tree | f3b75dcea569e28f1685500af53bff34514374b9 /kafka/producer/buffer.py | |
parent | 92f859d8da5c3f35ab3738ef2725fff05b6cf57f (diff) | |
parent | aa5bde6ac382966395f8f1466c46d55cf28c2cce (diff) | |
download | kafka-python-7941a2ac7ec6663f08c6291d92746eae9f792916.tar.gz |
Merge pull request #693 from dpkp/message_format_v1
Message format v1 (KIP-31 / KIP-32)
Diffstat (limited to 'kafka/producer/buffer.py')
-rw-r--r-- | kafka/producer/buffer.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index b2ac747..ba9b5db 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -29,7 +29,7 @@ class MessageSetBuffer(object): 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), } - def __init__(self, buf, batch_size, compression_type=None): + def __init__(self, buf, batch_size, compression_type=None, message_version=0): if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' checker, encoder, attributes = self._COMPRESSORS[compression_type] @@ -40,6 +40,7 @@ class MessageSetBuffer(object): self._compressor = None self._compression_attributes = None + self._message_version = message_version self._buffer = buf # Init MessageSetSize to 0 -- update on close self._buffer.seek(0) @@ -85,7 +86,8 @@ class MessageSetBuffer(object): # TODO: avoid copies with bytearray / memoryview self._buffer.seek(4) msg = Message(self._compressor(self._buffer.read()), - attributes=self._compression_attributes) + attributes=self._compression_attributes, + magic=self._message_version) encoded = msg.encode() self._buffer.seek(4) self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg |