diff options
author | Dana Powers <dana.powers@rd.io> | 2015-11-28 19:41:06 +0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:39 -0800 |
commit | a85e09df89a43de5b659a0fa4ed35bec37c60e04 (patch) | |
tree | a539af32fe502006c1f35b96d8ae36225292f7a5 /kafka/protocol/message.py | |
parent | e24a4d5f5252d6f97ac586e328b95779ef83f4b6 (diff) | |
download | kafka-python-a85e09df89a43de5b659a0fa4ed35bec37c60e04.tar.gz |
Rework protocol type definition: AbstractType, Schema, Struct
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py new file mode 100644 index 0000000..26f5ef6 --- /dev/null +++ b/kafka/protocol/message.py @@ -0,0 +1,67 @@ +from .struct import Struct +from .types import ( + Int8, Int16, Int32, Int64, Bytes, String, Array, Schema, AbstractType +) +from ..util import crc32 + + +class Message(Struct): + SCHEMA = Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes) + ) + + def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + self.crc = crc + self.magic = magic + self.attributes = attributes + self.key = key + self.value = value + self.encode = self._encode_self + + def _encode_self(self, recalc_crc=True): + message = Message.SCHEMA.encode( + (self.crc, self.magic, self.attributes, self.key, self.value) + ) + if not recalc_crc: + return message + self.crc = crc32(message[4:]) + return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + + +class MessageSet(AbstractType): + ITEM = Schema( + ('offset', Int64), + ('message_size', Int32), + ('message', Message.SCHEMA) + ) + + @classmethod + def encode(cls, items, size=True, recalc_message_size=True): + encoded_values = [] + for (offset, message_size, message) in items: + if isinstance(message, Message): + encoded_message = message.encode() + else: + encoded_message = cls.ITEM.fields[2].encode(message) + if recalc_message_size: + message_size = len(encoded_message) + encoded_values.append(cls.ITEM.fields[0].encode(offset)) + encoded_values.append(cls.ITEM.fields[1].encode(message_size)) + encoded_values.append(encoded_message) + encoded = b''.join(encoded_values) + if not size: + return encoded + return Int32.encode(len(encoded)) + encoded + + @classmethod + def decode(cls, data): + size = Int32.decode(data) + end = data.tell() + size + items = [] + while data.tell() < end: + items.append(cls.ITEM.decode(data)) + return items |