summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
committerDana Powers <dana.powers@gmail.com>2016-01-24 18:36:46 -0800
commit077dc4742ffa82584946379790424faf4c6ba47f (patch)
treebd14706a8dfc429f6bf211bac02ad21af967c6ce /kafka/protocol/message.py
parent48e96822b3ec4f897438a2d1cdb735f51648cb48 (diff)
parent85c0dd2579eb6aa0b9492d9082d0f4cf4d8ea39d (diff)
downloadkafka-python-077dc4742ffa82584946379790424faf4c6ba47f.tar.gz
Merge pull request #515 from dpkp/kafka_producer
KafkaProducer
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py14
1 files changed, 14 insertions, 0 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 2648e24..fb54049 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -20,6 +20,7 @@ class Message(Struct):
CODEC_MASK = 0x03
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
+ HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
assert value is None or isinstance(value, bytes), 'value must be bytes'
@@ -83,9 +84,17 @@ class MessageSet(AbstractType):
('message_size', Int32),
('message', Message.SCHEMA)
)
+ HEADER_SIZE = 12 # offset + message_size
@classmethod
def encode(cls, items, size=True, recalc_message_size=True):
+ # RecordAccumulator encodes messagesets internally
+ if isinstance(items, io.BytesIO):
+ size = Int32.decode(items)
+ # rewind and return all the bytes
+ items.seek(-4, 1)
+ return items.read(size + 4)
+
encoded_values = []
for (offset, message_size, message) in items:
if isinstance(message, Message):
@@ -141,4 +150,9 @@ class MessageSet(AbstractType):
@classmethod
def repr(cls, messages):
+ if isinstance(messages, io.BytesIO):
+ offset = messages.tell()
+ decoded = cls.decode(messages)
+ messages.seek(offset)
+ messages = decoded
return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'