diff options
author | Taras <voyn1991@gmail.com> | 2017-10-10 00:13:16 +0300 |
---|---|---|
committer | Taras <voyn1991@gmail.com> | 2017-10-11 18:09:17 +0300 |
commit | fbea5f04bccd28f3aa15a1711548b131504591ac (patch) | |
tree | 1c8a0efe687c2ace72fa146b4f03e15def8e3a95 /kafka/protocol/message.py | |
parent | f04435c5ed97fef0975a77a8dc7bae7c284bba63 (diff) | |
download | kafka-python-fbea5f04bccd28f3aa15a1711548b131504591ac.tar.gz |
Refactor MessageSet and Message into LegacyRecordBatch to later support v2 message format
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..f5a51a9 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,12 +154,13 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) return items.read(size + 4) encoded_values = [] @@ -167,7 +168,10 @@ class MessageSet(AbstractType): encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): |