diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 10:13:19 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 10:13:19 -0800 |
commit | f719ffcc047d4c6e4ad79d83257c4d1b2b014314 (patch) | |
tree | d42aad9787aba043019075e21f7aede9d463618b /kafka/protocol/message.py | |
parent | 753d8dca136178a4c2ecb0cda8d4ec371805455f (diff) | |
download | kafka-python-f719ffcc047d4c6e4ad79d83257c4d1b2b014314.tar.gz |
Handle decoding partial messages in MessageSet - caused by FetchRequest max_bytes
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 30 |
1 files changed, 24 insertions, 6 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index c3265f9..8f32749 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -59,13 +59,31 @@ class MessageSet(AbstractType): @classmethod def decode(cls, data): - size = Int32.decode(data) - bytes_read = 0 + bytes_to_read = Int32.decode(data) items = [] - while bytes_read < size: - items.append(cls.ITEM.decode(data)) - msg_size = items[-1][1] - bytes_read += (8 + 4 + msg_size) # item size = 8 byte offset, 4 byte message_size, plus message bytes + + # We need at least 12 bytes to read offset + message size + while bytes_to_read >= 12: + offset = Int64.decode(data) + bytes_to_read -= 8 + + message_size = Int32.decode(data) + bytes_to_read -= 4 + + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + if message_size > bytes_to_read: + break + + message = Message.decode(data) + bytes_to_read -= message_size + + items.append((offset, message_size, message)) + + # If any bytes are left over, clear them from the buffer + if bytes_to_read: + data.read(bytes_to_read) + return items @classmethod |