diff options
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 132 |
1 files changed, 78 insertions, 54 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ae261bf..473ca56 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,4 +1,5 @@ import io +import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode) @@ -11,22 +12,39 @@ from ..util import crc32 class Message(Struct): - SCHEMA = Schema( - ('crc', Int32), - ('magic', Int8), - ('attributes', Int8), - ('key', Bytes), - ('value', Bytes) - ) - CODEC_MASK = 0x03 + SCHEMAS = [ + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes)), + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('timestamp', Int64), + ('key', Bytes), + ('value', Bytes)), + ] + SCHEMA = SCHEMAS[1] + CODEC_MASK = 0x07 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 - HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) + TIMESTAMP_TYPE_MASK = 0x08 + HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) - def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + def __init__(self, value, key=None, magic=0, attributes=0, crc=0, + timestamp=None): assert value is None or isinstance(value, bytes), 'value must be bytes' assert key is None or isinstance(key, bytes), 'key must be bytes' + assert magic > 0 or timestamp is None, 'timestamp not supported in v0' + + # Default timestamp to now for v1 messages + if magic > 0 and timestamp is None: + timestamp = int(time.time() * 1000) + self.timestamp = timestamp self.crc = crc self.magic = magic self.attributes = attributes @@ -34,22 +52,48 @@ class Message(Struct): self.value = value self.encode = self._encode_self + @property + def timestamp_type(self): + """0 for CreateTime; 1 for LogAppendTime; None if unsupported. + + Value is determined by broker; produced messages should always set to 0 + Requires Kafka >= 0.10 / message version >= 1 + """ + if self.magic == 0: + return None + return self.attributes & self.TIMESTAMP_TYPE_MASK + def _encode_self(self, recalc_crc=True): - message = Message.SCHEMA.encode( - (self.crc, self.magic, self.attributes, self.key, self.value) - ) + version = self.magic + if version == 1: + fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) + elif version == 0: + fields = (self.crc, self.magic, self.attributes, self.key, self.value) + else: + raise ValueError('Unrecognized message version: %s' % version) + message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message self.crc = crc32(message[4:]) - return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + crc_field = self.SCHEMAS[version].fields[0] + return crc_field.encode(self.crc) + message[4:] @classmethod def decode(cls, data): if isinstance(data, bytes): data = io.BytesIO(data) - fields = [field.decode(data) for field in cls.SCHEMA.fields] - return cls(fields[4], key=fields[3], - magic=fields[1], attributes=fields[2], crc=fields[0]) + # Partial decode required to determine message version + base_fields = cls.SCHEMAS[0].fields[0:3] + crc, magic, attributes = [field.decode(data) for field in base_fields] + remaining = cls.SCHEMAS[magic].fields[3:] + fields = [field.decode(data) for field in remaining] + if magic == 1: + timestamp = fields[0] + else: + timestamp = None + return cls(fields[-1], key=fields[-2], + magic=magic, attributes=attributes, crc=crc, + timestamp=timestamp) def validate_crc(self): raw_msg = self._encode_self(recalc_crc=False) @@ -90,8 +134,7 @@ class PartialMessage(bytes): class MessageSet(AbstractType): ITEM = Schema( ('offset', Int64), - ('message_size', Int32), - ('message', Message.SCHEMA) + ('message', Bytes) ) HEADER_SIZE = 12 # offset + message_size @@ -105,20 +148,13 @@ class MessageSet(AbstractType): return items.read(size + 4) encoded_values = [] - for (offset, message_size, message) in items: - if isinstance(message, Message): - encoded_message = message.encode() - else: - encoded_message = cls.ITEM.fields[2].encode(message) - if recalc_message_size: - message_size = len(encoded_message) - encoded_values.append(cls.ITEM.fields[0].encode(offset)) - encoded_values.append(cls.ITEM.fields[1].encode(message_size)) - encoded_values.append(encoded_message) + for (offset, message) in items: + encoded_values.append(Int64.encode(offset)) + encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) if not size: return encoded - return Int32.encode(len(encoded)) + encoded + return Bytes.encode(encoded) @classmethod def decode(cls, data, bytes_to_read=None): @@ -131,30 +167,18 @@ class MessageSet(AbstractType): bytes_to_read = Int32.decode(data) items = [] - # We need at least 8 + 4 + 14 bytes to read offset + message size + message - # (14 bytes is a message w/ null key and null value) - while bytes_to_read >= 26: - offset = Int64.decode(data) - bytes_to_read -= 8 - - message_size = Int32.decode(data) - bytes_to_read -= 4 - - # if FetchRequest max_bytes is smaller than the available message set - # the server returns partial data for the final message - if message_size > bytes_to_read: + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + while bytes_to_read: + try: + offset = Int64.decode(data) + msg_bytes = Bytes.decode(data) + bytes_to_read -= 8 + 4 + len(msg_bytes) + items.append((offset, len(msg_bytes), Message.decode(msg_bytes))) + except ValueError: + # PartialMessage to signal that max_bytes may be too small + items.append((None, None, PartialMessage())) break - - message = Message.decode(data) - bytes_to_read -= message_size - - items.append((offset, message_size, message)) - - # If any bytes are left over, clear them from the buffer - # and append a PartialMessage to signal that max_bytes may be too small - if bytes_to_read: - items.append((None, None, PartialMessage(data.read(bytes_to_read)))) - return items @classmethod @@ -164,4 +188,4 @@ class MessageSet(AbstractType): decoded = cls.decode(messages) messages.seek(offset) messages = decoded - return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']' + return str([cls.ITEM.repr(m) for m in messages]) |