diff options
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 15 |
1 files changed, 10 insertions, 5 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 70d5b36..a330ed8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -154,20 +154,25 @@ class MessageSet(AbstractType): HEADER_SIZE = 12 # offset + message_size @classmethod - def encode(cls, items): + def encode(cls, items, prepend_size=True): # RecordAccumulator encodes messagesets internally if isinstance(items, (io.BytesIO, KafkaBytes)): size = Int32.decode(items) - # rewind and return all the bytes - items.seek(items.tell() - 4) - return items.read(size + 4) + if prepend_size: + # rewind and return all the bytes + items.seek(items.tell() - 4) + size += 4 + return items.read(size) encoded_values = [] for (offset, message) in items: encoded_values.append(Int64.encode(offset)) encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) - return Bytes.encode(encoded) + if prepend_size: + return Bytes.encode(encoded) + else: + return encoded @classmethod def decode(cls, data, bytes_to_read=None): |