diff options
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index c3265f9..8f32749 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -59,13 +59,31 @@ class MessageSet(AbstractType): @classmethod def decode(cls, data): - size = Int32.decode(data) - bytes_read = 0 + bytes_to_read = Int32.decode(data) items = [] - while bytes_read < size: - items.append(cls.ITEM.decode(data)) - msg_size = items[-1][1] - bytes_read += (8 + 4 + msg_size) # item size = 8 byte offset, 4 byte message_size, plus message bytes + + # We need at least 12 bytes to read offset + message size + while bytes_to_read >= 12: + offset = Int64.decode(data) + bytes_to_read -= 8 + + message_size = Int32.decode(data) + bytes_to_read -= 4 + + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + if message_size > bytes_to_read: + break + + message = Message.decode(data) + bytes_to_read -= message_size + + items.append((offset, message_size, message)) + + # If any bytes are left over, clear them from the buffer + if bytes_to_read: + data.read(bytes_to_read) + return items @classmethod |