diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:16:37 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-09 15:16:37 -0800 |
commit | 1636c96df41b61b37883a60238dfb42b353f36a2 (patch) | |
tree | d5337cf2115d671975af1ff2df83956235f4fcb6 /kafka/protocol/message.py | |
parent | 5aeba4a7dc68e76c96f743a8a9e3e6603875695e (diff) | |
download | kafka-python-1636c96df41b61b37883a60238dfb42b353f36a2.tar.gz |
Return PartialMessage object in MessageSet.decode if message is truncated by max_bytes
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 13 |
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 3027ebd..cd5d274 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -42,6 +42,11 @@ class Message(Struct): magic=fields[1], attributes=fields[2], crc=fields[0]) +class PartialMessage(bytes): + def __repr__(self): + return 'PartialMessage(%s)' % self + + class MessageSet(AbstractType): ITEM = Schema( ('offset', Int64), @@ -72,8 +77,9 @@ class MessageSet(AbstractType): bytes_to_read = Int32.decode(data) items = [] - # We need at least 12 bytes to read offset + message size - while bytes_to_read >= 12: + # We need at least 8 + 4 + 14 bytes to read offset + message size + message + # (14 bytes is a message w/ null key and null value) + while bytes_to_read >= 26: offset = Int64.decode(data) bytes_to_read -= 8 @@ -91,8 +97,9 @@ class MessageSet(AbstractType): items.append((offset, message_size, message)) # If any bytes are left over, clear them from the buffer + # and append a PartialMessage to signal that max_bytes may be too small if bytes_to_read: - data.read(bytes_to_read) + items.append((None, None, PartialMessage(data.read(bytes_to_read)))) return items |