summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-11-28 19:41:06 +0800
committerZack Dever <zack.dever@rd.io>2015-12-04 11:25:39 -0800
commita85e09df89a43de5b659a0fa4ed35bec37c60e04 (patch)
treea539af32fe502006c1f35b96d8ae36225292f7a5 /kafka/protocol/message.py
parente24a4d5f5252d6f97ac586e328b95779ef83f4b6 (diff)
downloadkafka-python-a85e09df89a43de5b659a0fa4ed35bec37c60e04.tar.gz
Rework protocol type definition: AbstractType, Schema, Struct
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
new file mode 100644
index 0000000..26f5ef6
--- /dev/null
+++ b/kafka/protocol/message.py
@@ -0,0 +1,67 @@
+from .struct import Struct
+from .types import (
+ Int8, Int16, Int32, Int64, Bytes, String, Array, Schema, AbstractType
+)
+from ..util import crc32
+
+
+class Message(Struct):
+ SCHEMA = Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('key', Bytes),
+ ('value', Bytes)
+ )
+
+ def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
+ self.crc = crc
+ self.magic = magic
+ self.attributes = attributes
+ self.key = key
+ self.value = value
+ self.encode = self._encode_self
+
+ def _encode_self(self, recalc_crc=True):
+ message = Message.SCHEMA.encode(
+ (self.crc, self.magic, self.attributes, self.key, self.value)
+ )
+ if not recalc_crc:
+ return message
+ self.crc = crc32(message[4:])
+ return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
+
+
+class MessageSet(AbstractType):
+ ITEM = Schema(
+ ('offset', Int64),
+ ('message_size', Int32),
+ ('message', Message.SCHEMA)
+ )
+
+ @classmethod
+ def encode(cls, items, size=True, recalc_message_size=True):
+ encoded_values = []
+ for (offset, message_size, message) in items:
+ if isinstance(message, Message):
+ encoded_message = message.encode()
+ else:
+ encoded_message = cls.ITEM.fields[2].encode(message)
+ if recalc_message_size:
+ message_size = len(encoded_message)
+ encoded_values.append(cls.ITEM.fields[0].encode(offset))
+ encoded_values.append(cls.ITEM.fields[1].encode(message_size))
+ encoded_values.append(encoded_message)
+ encoded = b''.join(encoded_values)
+ if not size:
+ return encoded
+ return Int32.encode(len(encoded)) + encoded
+
+ @classmethod
+ def decode(cls, data):
+ size = Int32.decode(data)
+ end = data.tell() + size
+ items = []
+ while data.tell() < end:
+ items.append(cls.ITEM.decode(data))
+ return items