summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 00:31:16 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 09:51:37 -0700
commit795cb9b29fa05d4425f807f54dfa639c125fc0dd (patch)
tree7fba03e95f26185c126aa95d1acdd2af5d2ad925 /kafka/protocol/message.py
parent7f4a9361ea168a0e1073801d0b86868de47d1de2 (diff)
downloadkafka-python-795cb9b29fa05d4425f807f54dfa639c125fc0dd.tar.gz
KAFKA-3025: Message v1 -- add timetamp and use relative offset in compressed messagesets
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py78
1 files changed, 61 insertions, 17 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index 8458ac5..473ca56 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,4 +1,5 @@
import io
+import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode, lz4_decode)
@@ -11,22 +12,39 @@ from ..util import crc32
class Message(Struct):
- SCHEMA = Schema(
- ('crc', Int32),
- ('magic', Int8),
- ('attributes', Int8),
- ('key', Bytes),
- ('value', Bytes)
- )
- CODEC_MASK = 0x03
+ SCHEMAS = [
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('key', Bytes),
+ ('value', Bytes)),
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('timestamp', Int64),
+ ('key', Bytes),
+ ('value', Bytes)),
+ ]
+ SCHEMA = SCHEMAS[1]
+ CODEC_MASK = 0x07
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
- HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
+ TIMESTAMP_TYPE_MASK = 0x08
+ HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
- def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
+ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
+ timestamp=None):
assert value is None or isinstance(value, bytes), 'value must be bytes'
assert key is None or isinstance(key, bytes), 'key must be bytes'
+ assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
+
+ # Default timestamp to now for v1 messages
+ if magic > 0 and timestamp is None:
+ timestamp = int(time.time() * 1000)
+ self.timestamp = timestamp
self.crc = crc
self.magic = magic
self.attributes = attributes
@@ -34,22 +52,48 @@ class Message(Struct):
self.value = value
self.encode = self._encode_self
+ @property
+ def timestamp_type(self):
+ """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
+
+ Value is determined by broker; produced messages should always set to 0
+ Requires Kafka >= 0.10 / message version >= 1
+ """
+ if self.magic == 0:
+ return None
+ return self.attributes & self.TIMESTAMP_TYPE_MASK
+
def _encode_self(self, recalc_crc=True):
- message = Message.SCHEMA.encode(
- (self.crc, self.magic, self.attributes, self.key, self.value)
- )
+ version = self.magic
+ if version == 1:
+ fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
+ elif version == 0:
+ fields = (self.crc, self.magic, self.attributes, self.key, self.value)
+ else:
+ raise ValueError('Unrecognized message version: %s' % version)
+ message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
self.crc = crc32(message[4:])
- return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
+ crc_field = self.SCHEMAS[version].fields[0]
+ return crc_field.encode(self.crc) + message[4:]
@classmethod
def decode(cls, data):
if isinstance(data, bytes):
data = io.BytesIO(data)
- fields = [field.decode(data) for field in cls.SCHEMA.fields]
- return cls(fields[4], key=fields[3],
- magic=fields[1], attributes=fields[2], crc=fields[0])
+ # Partial decode required to determine message version
+ base_fields = cls.SCHEMAS[0].fields[0:3]
+ crc, magic, attributes = [field.decode(data) for field in base_fields]
+ remaining = cls.SCHEMAS[magic].fields[3:]
+ fields = [field.decode(data) for field in remaining]
+ if magic == 1:
+ timestamp = fields[0]
+ else:
+ timestamp = None
+ return cls(fields[-1], key=fields[-2],
+ magic=magic, attributes=attributes, crc=crc,
+ timestamp=timestamp)
def validate_crc(self):
raw_msg = self._encode_self(recalc_crc=False)