diff options
author | Taras Voinarovskyi <voyn1991@gmail.com> | 2017-10-14 23:06:27 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-10-14 23:06:27 +0300 |
commit | fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f (patch) | |
tree | 52e5860b1f8738b15e7c757c205961b761badd2b /kafka/protocol/message.py | |
parent | dd8e33654f2270097d6c1373dc272153670e48f8 (diff) | |
parent | 365cae02da59721df77923bb5f5a2d94a84b2e83 (diff) | |
download | kafka-python-fbbd6ca5d999a8520d483ecfe0ad6f805eb8833f.tar.gz |
Merge pull request #1252 from dpkp/legacy_records_refactor
Refactor MessageSet and Message into LegacyRecordBatch
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..a330ed8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,20 +154,25 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) - return items.read(size + 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) + size += 4 + return items.read(size) encoded_values = [] for (offset, message) in items: encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): |