summaryrefslogtreecommitdiff
path: root/kafka/protocol/message.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-05-22 10:28:56 -0700
committerDana Powers <dana.powers@gmail.com>2016-05-22 10:28:56 -0700
commit7941a2ac7ec6663f08c6291d92746eae9f792916 (patch)
treef3b75dcea569e28f1685500af53bff34514374b9 /kafka/protocol/message.py
parent92f859d8da5c3f35ab3738ef2725fff05b6cf57f (diff)
parentaa5bde6ac382966395f8f1466c46d55cf28c2cce (diff)
downloadkafka-python-7941a2ac7ec6663f08c6291d92746eae9f792916.tar.gz
Merge pull request #693 from dpkp/message_format_v1
Message format v1 (KIP-31 / KIP-32)
Diffstat (limited to 'kafka/protocol/message.py')
-rw-r--r--kafka/protocol/message.py132
1 files changed, 78 insertions, 54 deletions
diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py
index ae261bf..473ca56 100644
--- a/kafka/protocol/message.py
+++ b/kafka/protocol/message.py
@@ -1,4 +1,5 @@
import io
+import time
from ..codec import (has_gzip, has_snappy, has_lz4,
gzip_decode, snappy_decode, lz4_decode)
@@ -11,22 +12,39 @@ from ..util import crc32
class Message(Struct):
- SCHEMA = Schema(
- ('crc', Int32),
- ('magic', Int8),
- ('attributes', Int8),
- ('key', Bytes),
- ('value', Bytes)
- )
- CODEC_MASK = 0x03
+ SCHEMAS = [
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('key', Bytes),
+ ('value', Bytes)),
+ Schema(
+ ('crc', Int32),
+ ('magic', Int8),
+ ('attributes', Int8),
+ ('timestamp', Int64),
+ ('key', Bytes),
+ ('value', Bytes)),
+ ]
+ SCHEMA = SCHEMAS[1]
+ CODEC_MASK = 0x07
CODEC_GZIP = 0x01
CODEC_SNAPPY = 0x02
CODEC_LZ4 = 0x03
- HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2)
+ TIMESTAMP_TYPE_MASK = 0x08
+ HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2)
- def __init__(self, value, key=None, magic=0, attributes=0, crc=0):
+ def __init__(self, value, key=None, magic=0, attributes=0, crc=0,
+ timestamp=None):
assert value is None or isinstance(value, bytes), 'value must be bytes'
assert key is None or isinstance(key, bytes), 'key must be bytes'
+ assert magic > 0 or timestamp is None, 'timestamp not supported in v0'
+
+ # Default timestamp to now for v1 messages
+ if magic > 0 and timestamp is None:
+ timestamp = int(time.time() * 1000)
+ self.timestamp = timestamp
self.crc = crc
self.magic = magic
self.attributes = attributes
@@ -34,22 +52,48 @@ class Message(Struct):
self.value = value
self.encode = self._encode_self
+ @property
+ def timestamp_type(self):
+ """0 for CreateTime; 1 for LogAppendTime; None if unsupported.
+
+ Value is determined by broker; produced messages should always set to 0
+ Requires Kafka >= 0.10 / message version >= 1
+ """
+ if self.magic == 0:
+ return None
+ return self.attributes & self.TIMESTAMP_TYPE_MASK
+
def _encode_self(self, recalc_crc=True):
- message = Message.SCHEMA.encode(
- (self.crc, self.magic, self.attributes, self.key, self.value)
- )
+ version = self.magic
+ if version == 1:
+ fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value)
+ elif version == 0:
+ fields = (self.crc, self.magic, self.attributes, self.key, self.value)
+ else:
+ raise ValueError('Unrecognized message version: %s' % version)
+ message = Message.SCHEMAS[version].encode(fields)
if not recalc_crc:
return message
self.crc = crc32(message[4:])
- return self.SCHEMA.fields[0].encode(self.crc) + message[4:]
+ crc_field = self.SCHEMAS[version].fields[0]
+ return crc_field.encode(self.crc) + message[4:]
@classmethod
def decode(cls, data):
if isinstance(data, bytes):
data = io.BytesIO(data)
- fields = [field.decode(data) for field in cls.SCHEMA.fields]
- return cls(fields[4], key=fields[3],
- magic=fields[1], attributes=fields[2], crc=fields[0])
+ # Partial decode required to determine message version
+ base_fields = cls.SCHEMAS[0].fields[0:3]
+ crc, magic, attributes = [field.decode(data) for field in base_fields]
+ remaining = cls.SCHEMAS[magic].fields[3:]
+ fields = [field.decode(data) for field in remaining]
+ if magic == 1:
+ timestamp = fields[0]
+ else:
+ timestamp = None
+ return cls(fields[-1], key=fields[-2],
+ magic=magic, attributes=attributes, crc=crc,
+ timestamp=timestamp)
def validate_crc(self):
raw_msg = self._encode_self(recalc_crc=False)
@@ -90,8 +134,7 @@ class PartialMessage(bytes):
class MessageSet(AbstractType):
ITEM = Schema(
('offset', Int64),
- ('message_size', Int32),
- ('message', Message.SCHEMA)
+ ('message', Bytes)
)
HEADER_SIZE = 12 # offset + message_size
@@ -105,20 +148,13 @@ class MessageSet(AbstractType):
return items.read(size + 4)
encoded_values = []
- for (offset, message_size, message) in items:
- if isinstance(message, Message):
- encoded_message = message.encode()
- else:
- encoded_message = cls.ITEM.fields[2].encode(message)
- if recalc_message_size:
- message_size = len(encoded_message)
- encoded_values.append(cls.ITEM.fields[0].encode(offset))
- encoded_values.append(cls.ITEM.fields[1].encode(message_size))
- encoded_values.append(encoded_message)
+ for (offset, message) in items:
+ encoded_values.append(Int64.encode(offset))
+ encoded_values.append(Bytes.encode(message))
encoded = b''.join(encoded_values)
if not size:
return encoded
- return Int32.encode(len(encoded)) + encoded
+ return Bytes.encode(encoded)
@classmethod
def decode(cls, data, bytes_to_read=None):
@@ -131,30 +167,18 @@ class MessageSet(AbstractType):
bytes_to_read = Int32.decode(data)
items = []
- # We need at least 8 + 4 + 14 bytes to read offset + message size + message
- # (14 bytes is a message w/ null key and null value)
- while bytes_to_read >= 26:
- offset = Int64.decode(data)
- bytes_to_read -= 8
-
- message_size = Int32.decode(data)
- bytes_to_read -= 4
-
- # if FetchRequest max_bytes is smaller than the available message set
- # the server returns partial data for the final message
- if message_size > bytes_to_read:
+ # if FetchRequest max_bytes is smaller than the available message set
+ # the server returns partial data for the final message
+ while bytes_to_read:
+ try:
+ offset = Int64.decode(data)
+ msg_bytes = Bytes.decode(data)
+ bytes_to_read -= 8 + 4 + len(msg_bytes)
+ items.append((offset, len(msg_bytes), Message.decode(msg_bytes)))
+ except ValueError:
+ # PartialMessage to signal that max_bytes may be too small
+ items.append((None, None, PartialMessage()))
break
-
- message = Message.decode(data)
- bytes_to_read -= message_size
-
- items.append((offset, message_size, message))
-
- # If any bytes are left over, clear them from the buffer
- # and append a PartialMessage to signal that max_bytes may be too small
- if bytes_to_read:
- items.append((None, None, PartialMessage(data.read(bytes_to_read))))
-
return items
@classmethod
@@ -164,4 +188,4 @@ class MessageSet(AbstractType):
decoded = cls.decode(messages)
messages.seek(offset)
messages = decoded
- return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']'
+ return str([cls.ITEM.repr(m) for m in messages])