diff options
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 3027ebd..cd5d274 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -42,6 +42,11 @@ class Message(Struct): magic=fields[1], attributes=fields[2], crc=fields[0]) +class PartialMessage(bytes): + def __repr__(self): + return 'PartialMessage(%s)' % self + + class MessageSet(AbstractType): ITEM = Schema( ('offset', Int64), @@ -72,8 +77,9 @@ class MessageSet(AbstractType): bytes_to_read = Int32.decode(data) items = [] - # We need at least 12 bytes to read offset + message size - while bytes_to_read >= 12: + # We need at least 8 + 4 + 14 bytes to read offset + message size + message + # (14 bytes is a message w/ null key and null value) + while bytes_to_read >= 26: offset = Int64.decode(data) bytes_to_read -= 8 @@ -91,8 +97,9 @@ class MessageSet(AbstractType): items.append((offset, message_size, message)) # If any bytes are left over, clear them from the buffer + # and append a PartialMessage to signal that max_bytes may be too small if bytes_to_read: - data.read(bytes_to_read) + items.append((None, None, PartialMessage(data.read(bytes_to_read)))) return items |