summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-09 10:13:19 -0800
committerDana Powers <dana.powers@rd.io>2015-12-09 10:13:19 -0800
commitf719ffcc047d4c6e4ad79d83257c4d1b2b014314 (patch)
treed42aad9787aba043019075e21f7aede9d463618b /kafka/protocol/message.py
parent753d8dca136178a4c2ecb0cda8d4ec371805455f (diff)
downloadkafka-python-f719ffcc047d4c6e4ad79d83257c4d1b2b014314.tar.gz
Handle decoding partial messages in MessageSet - caused by FetchRequest max_bytes
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py30
1 files changed, 24 insertions, 6 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index c3265f9..8f32749 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -59,13 +59,31 @@ class MessageSet(AbstractType):
@classmethod
def decode(cls, data):
- size = Int32.decode(data)
- bytes_read = 0
+ bytes_to_read = Int32.decode(data)
items = []
- while bytes_read < size:
- items.append(cls.ITEM.decode(data))
- msg_size = items[-1][1]
- bytes_read += (8 + 4 + msg_size) # item size = 8 byte offset, 4 byte message_size, plus message bytes
+
+ # We need at least 12 bytes to read offset + message size
+ while bytes_to_read >= 12:
+ offset = Int64.decode(data)
+ bytes_to_read -= 8
+
+ message_size = Int32.decode(data)
+ bytes_to_read -= 4
+
+ # if FetchRequest max_bytes is smaller than the available message set
+ # the server returns partial data for the final message
+ if message_size > bytes_to_read:
+ break
+
+ message = Message.decode(data)
+ bytes_to_read -= message_size
+
+ items.append((offset, message_size, message))
+
+ # If any bytes are left over, clear them from the buffer
+ if bytes_to_read:
+ data.read(bytes_to_read)
+
return items
@classmethod