diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 00:31:16 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 09:51:37 -0700 |
commit | 795cb9b29fa05d4425f807f54dfa639c125fc0dd (patch) | |
tree | 7fba03e95f26185c126aa95d1acdd2af5d2ad925 /kafka/protocol/message.py | |
parent | 7f4a9361ea168a0e1073801d0b86868de47d1de2 (diff) | |
download | kafka-python-795cb9b29fa05d4425f807f54dfa639c125fc0dd.tar.gz |
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r-- | kafka/protocol/message.py | 78 |
1 files changed, 61 insertions, 17 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 8458ac5..473ca56 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,4 +1,5 @@ import io +import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode) @@ -11,22 +12,39 @@ from ..util import crc32 class Message(Struct): - SCHEMA = Schema( - ('crc', Int32), - ('magic', Int8), - ('attributes', Int8), - ('key', Bytes), - ('value', Bytes) - ) - CODEC_MASK = 0x03 + SCHEMAS = [ + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes)), + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('timestamp', Int64), + ('key', Bytes), + ('value', Bytes)), + ] + SCHEMA = SCHEMAS[1] + CODEC_MASK = 0x07 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 - HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) + TIMESTAMP_TYPE_MASK = 0x08 + HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) - def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + def __init__(self, value, key=None, magic=0, attributes=0, crc=0, + timestamp=None): assert value is None or isinstance(value, bytes), 'value must be bytes' assert key is None or isinstance(key, bytes), 'key must be bytes' + assert magic > 0 or timestamp is None, 'timestamp not supported in v0' + + # Default timestamp to now for v1 messages + if magic > 0 and timestamp is None: + timestamp = int(time.time() * 1000) + self.timestamp = timestamp self.crc = crc self.magic = magic self.attributes = attributes @@ -34,22 +52,48 @@ class Message(Struct): self.value = value self.encode = self._encode_self + @property + def timestamp_type(self): + """0 for CreateTime; 1 for LogAppendTime; None if unsupported. + + Value is determined by broker; produced messages should always set to 0 + Requires Kafka >= 0.10 / message version >= 1 + """ + if self.magic == 0: + return None + return self.attributes & self.TIMESTAMP_TYPE_MASK + def _encode_self(self, recalc_crc=True): - message = Message.SCHEMA.encode( - (self.crc, self.magic, self.attributes, self.key, self.value) - ) + version = self.magic + if version == 1: + fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) + elif version == 0: + fields = (self.crc, self.magic, self.attributes, self.key, self.value) + else: + raise ValueError('Unrecognized message version: %s' % version) + message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message self.crc = crc32(message[4:]) - return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + crc_field = self.SCHEMAS[version].fields[0] + return crc_field.encode(self.crc) + message[4:] @classmethod def decode(cls, data): if isinstance(data, bytes): data = io.BytesIO(data) - fields = [field.decode(data) for field in cls.SCHEMA.fields] - return cls(fields[4], key=fields[3], - magic=fields[1], attributes=fields[2], crc=fields[0]) + # Partial decode required to determine message version + base_fields = cls.SCHEMAS[0].fields[0:3] + crc, magic, attributes = [field.decode(data) for field in base_fields] + remaining = cls.SCHEMAS[magic].fields[3:] + fields = [field.decode(data) for field in remaining] + if magic == 1: + timestamp = fields[0] + else: + timestamp = None + return cls(fields[-1], key=fields[-2], + magic=magic, attributes=attributes, crc=crc, + timestamp=timestamp) def validate_crc(self): raw_msg = self._encode_self(recalc_crc=False) |