summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 3027ebd..cd5d274 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -42,6 +42,11 @@ class Message(Struct):
magic=fields[1], attributes=fields[2], crc=fields[0])
+class PartialMessage(bytes):
+ def __repr__(self):
+ return 'PartialMessage(%s)' % self
+
+
class MessageSet(AbstractType):
ITEM = Schema(
('offset', Int64),
@@ -72,8 +77,9 @@ class MessageSet(AbstractType):
bytes_to_read = Int32.decode(data)
items = []
- # We need at least 12 bytes to read offset + message size
- while bytes_to_read >= 12:
+ # We need at least 8 + 4 + 14 bytes to read offset + message size + message
+ # (14 bytes is a message w/ null key and null value)
+ while bytes_to_read >= 26:
offset = Int64.decode(data)
bytes_to_read -= 8
@@ -91,8 +97,9 @@ class MessageSet(AbstractType):
items.append((offset, message_size, message))
# If any bytes are left over, clear them from the buffer
+ # and append a PartialMessage to signal that max_bytes may be too small
if bytes_to_read:
- data.read(bytes_to_read)
+ items.append((None, None, PartialMessage(data.read(bytes_to_read))))
return items