diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/protocol/message.py | 9 |
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 78840fc..656c131 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -169,14 +169,17 @@ class MessageSet(AbstractType): data = io.BytesIO(data) if bytes_to_read is None: bytes_to_read = Int32.decode(data) - items = [] # if FetchRequest max_bytes is smaller than the available message set # the server returns partial data for the final message + # So create an internal buffer to avoid over-reading + raw = io.BytesIO(data.read(bytes_to_read)) + + items = [] while bytes_to_read: try: - offset = Int64.decode(data) - msg_bytes = Bytes.decode(data) + offset = Int64.decode(raw) + msg_bytes = Bytes.decode(raw) bytes_to_read -= 8 + 4 + len(msg_bytes) items.append((offset, len(msg_bytes), Message.decode(msg_bytes))) except ValueError: |