diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-05-22 10:28:56 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-05-22 10:28:56 -0700 |
commit | 7941a2ac7ec6663f08c6291d92746eae9f792916 (patch) | |
tree | f3b75dcea569e28f1685500af53bff34514374b9 /kafka | |
parent | 92f859d8da5c3f35ab3738ef2725fff05b6cf57f (diff) | |
parent | aa5bde6ac382966395f8f1466c46d55cf28c2cce (diff) | |
download | kafka-python-7941a2ac7ec6663f08c6291d92746eae9f792916.tar.gz |
Merge pull request #693 from dpkp/message_format_v1
Message format v1 (KIP-31 / KIP-32)
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/consumer/fetcher.py | 24 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 6 | ||||
-rw-r--r-- | kafka/producer/future.py | 18 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 15 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 32 | ||||
-rw-r--r-- | kafka/producer/sender.py | 9 | ||||
-rw-r--r-- | kafka/protocol/legacy.py | 8 | ||||
-rw-r--r-- | kafka/protocol/message.py | 132 | ||||
-rw-r--r-- | kafka/protocol/types.py | 47 |
9 files changed, 186 insertions, 105 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index 3a5e37e..bf59775 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -19,7 +19,7 @@ log = logging.getLogger(__name__) ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "key", "value"]) + ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"]) class NoOffsetForPartitionError(Errors.KafkaError): @@ -351,17 +351,33 @@ class Fetcher(six.Iterator): position) return dict(drained) - def _unpack_message_set(self, tp, messages): + def _unpack_message_set(self, tp, messages, relative_offset=0): try: for offset, size, msg in messages: if self.config['check_crcs'] and not msg.validate_crc(): raise Errors.InvalidMessageError(msg) elif msg.is_compressed(): - for record in self._unpack_message_set(tp, msg.decompress()): + mset = msg.decompress() + # new format uses relative offsets for compressed messages + if msg.magic > 0: + last_offset, _, _ = mset[-1] + relative = offset - last_offset + else: + relative = 0 + for record in self._unpack_message_set(tp, mset, relative): yield record else: + # Message v1 adds timestamp + if msg.magic > 0: + timestamp = msg.timestamp + timestamp_type = msg.timestamp_type + else: + timestamp = timestamp_type = None key, value = self._deserialize(msg) - yield ConsumerRecord(tp.topic, tp.partition, offset, key, value) + yield ConsumerRecord(tp.topic, tp.partition, + offset + relative_offset, + timestamp, timestamp_type, + key, value) # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index b2ac747..ba9b5db 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -29,7 +29,7 @@ class MessageSetBuffer(object): 'snappy': (has_snappy, snappy_encode, Message.CODEC_SNAPPY), 'lz4': (has_lz4, lz4_encode, Message.CODEC_LZ4), } - def __init__(self, buf, batch_size, compression_type=None): + def __init__(self, buf, batch_size, compression_type=None, message_version=0): if compression_type is not None: assert compression_type in self._COMPRESSORS, 'Unrecognized compression type' checker, encoder, attributes = self._COMPRESSORS[compression_type] @@ -40,6 +40,7 @@ class MessageSetBuffer(object): self._compressor = None self._compression_attributes = None + self._message_version = message_version self._buffer = buf # Init MessageSetSize to 0 -- update on close self._buffer.seek(0) @@ -85,7 +86,8 @@ class MessageSetBuffer(object): # TODO: avoid copies with bytearray / memoryview self._buffer.seek(4) msg = Message(self._compressor(self._buffer.read()), - attributes=self._compression_attributes) + attributes=self._compression_attributes, + magic=self._message_version) encoded = msg.encode() self._buffer.seek(4) self._buffer.write(Int64.encode(0)) # offset 0 for wrapper msg diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 35520d8..acf4255 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,16 +29,21 @@ class FutureProduceResult(Future): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset): + def __init__(self, produce_future, relative_offset, timestamp_ms): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future self.relative_offset = relative_offset + self.timestamp_ms = timestamp_ms produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) - def _produce_success(self, base_offset): + def _produce_success(self, offset_and_timestamp): + base_offset, timestamp_ms = offset_and_timestamp + if timestamp_ms is None: + timestamp_ms = self.timestamp_ms self.success(RecordMetadata(self._produce_future.topic_partition, - base_offset, self.relative_offset)) + base_offset, timestamp_ms, + self.relative_offset)) def get(self, timeout=None): if not self.is_done and not self._produce_future.await(timeout): @@ -51,12 +56,13 @@ class FutureRecordMetadata(Future): class RecordMetadata(collections.namedtuple( - 'RecordMetadata', 'topic partition topic_partition offset')): - def __new__(cls, tp, base_offset, relative_offset=None): + 'RecordMetadata', 'topic partition topic_partition offset timestamp')): + def __new__(cls, tp, base_offset, timestamp, relative_offset=None): offset = base_offset if relative_offset is not None and base_offset != -1: offset += relative_offset - return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, tp, offset) + return super(RecordMetadata, cls).__new__(cls, tp.topic, tp.partition, + tp, offset, timestamp) def __str__(self): return 'RecordMetadata(topic=%s, partition=%s, offset=%s)' % ( diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 7e8f625..7aa24b3 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -347,7 +347,7 @@ class KafkaProducer(object): max_wait = self.config['max_block_ms'] / 1000.0 return self._wait_on_metadata(topic, max_wait) - def send(self, topic, value=None, key=None, partition=None): + def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None): """Publish a message to a topic. Arguments: @@ -368,6 +368,8 @@ class KafkaProducer(object): partition (but if key is None, partition is chosen randomly). Must be type bytes, or be serializable to bytes via configured key_serializer. + timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC) + to use as the message timestamp. Defaults to current time. Returns: FutureRecordMetadata: resolves to RecordMetadata @@ -396,8 +398,11 @@ class KafkaProducer(object): self._ensure_valid_record_size(message_size) tp = TopicPartition(topic, partition) + if timestamp_ms is None: + timestamp_ms = int(time.time() * 1000) log.debug("Sending (key=%s value=%s) to %s", key, value, tp) - result = self._accumulator.append(tp, key_bytes, value_bytes, + result = self._accumulator.append(tp, timestamp_ms, + key_bytes, value_bytes, self.config['max_block_ms']) future, batch_is_full, new_batch_created = result if batch_is_full or new_batch_created: @@ -416,8 +421,10 @@ class KafkaProducer(object): except Exception as e: log.debug("Exception occurred during message send: %s", e) return FutureRecordMetadata( - FutureProduceResult(TopicPartition(topic, partition)), - -1).failure(e) + FutureProduceResult( + TopicPartition(topic, partition)), + -1, None + ).failure(e) def flush(self, timeout=None): """ diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 9eb0e95..4434b18 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -36,7 +36,7 @@ class AtomicInteger(object): class RecordBatch(object): - def __init__(self, tp, records): + def __init__(self, tp, records, message_version=0): self.record_count = 0 #self.max_record_size = 0 # for metrics only now = time.time() @@ -46,22 +46,25 @@ class RecordBatch(object): self.last_attempt = now self.last_append = now self.records = records + self.message_version = message_version self.topic_partition = tp self.produce_future = FutureProduceResult(tp) self._retry = False - def try_append(self, key, value): + def try_append(self, timestamp_ms, key, value): if not self.records.has_room_for(key, value): return None - self.records.append(self.record_count, Message(value, key=key)) + msg = Message(value, key=key, magic=self.message_version) + self.records.append(self.record_count, msg) # self.max_record_size = max(self.max_record_size, Record.record_size(key, value)) # for metrics only self.last_append = time.time() - future = FutureRecordMetadata(self.produce_future, self.record_count) + future = FutureRecordMetadata(self.produce_future, self.record_count, + timestamp_ms) self.record_count += 1 return future - def done(self, base_offset=None, exception=None): + def done(self, base_offset=None, timestamp_ms=None, exception=None): log.debug("Produced messages to topic-partition %s with base offset" " %s and error %s.", self.topic_partition, base_offset, exception) # trace @@ -69,7 +72,7 @@ class RecordBatch(object): log.warning('Batch is already closed -- ignoring batch.done()') return elif exception is None: - self.produce_future.success(base_offset) + self.produce_future.success((base_offset, timestamp_ms)) else: self.produce_future.failure(exception) @@ -78,7 +81,7 @@ class RecordBatch(object): if ((self.records.is_full() and request_timeout_ms < since_append_ms) or (request_timeout_ms < (since_append_ms + linger_ms))): self.records.close() - self.done(-1, Errors.KafkaTimeoutError( + self.done(-1, None, Errors.KafkaTimeoutError( "Batch containing %s record(s) expired due to timeout while" " requesting metadata from brokers for %s", self.record_count, self.topic_partition)) @@ -137,6 +140,7 @@ class RecordAccumulator(object): 'compression_type': None, 'linger_ms': 0, 'retry_backoff_ms': 100, + 'message_version': 0, } def __init__(self, **configs): @@ -155,7 +159,7 @@ class RecordAccumulator(object): self.config['batch_size']) self._incomplete = IncompleteRecordBatches() - def append(self, tp, key, value, max_time_to_block_ms): + def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms): """Add a record to the accumulator, return the append result. The append result will contain the future metadata, and flag for @@ -164,6 +168,7 @@ class RecordAccumulator(object): Arguments: tp (TopicPartition): The topic/partition to which this record is being sent + timestamp_ms (int): The timestamp of the record (epoch ms) key (bytes): The key for the record value (bytes): The value for the record max_time_to_block_ms (int): The maximum time in milliseconds to @@ -188,7 +193,7 @@ class RecordAccumulator(object): dq = self._batches[tp] if dq: last = dq[-1] - future = last.try_append(key, value) + future = last.try_append(timestamp_ms, key, value) if future is not None: batch_is_full = len(dq) > 1 or last.records.is_full() return future, batch_is_full, False @@ -211,7 +216,7 @@ class RecordAccumulator(object): if dq: last = dq[-1] - future = last.try_append(key, value) + future = last.try_append(timestamp_ms, key, value) if future is not None: # Somebody else found us a batch, return the one we # waited for! Hopefully this doesn't happen often... @@ -220,9 +225,10 @@ class RecordAccumulator(object): return future, batch_is_full, False records = MessageSetBuffer(buf, self.config['batch_size'], - self.config['compression_type']) - batch = RecordBatch(tp, records) - future = batch.try_append(key, value) + self.config['compression_type'], + self.config['message_version']) + batch = RecordBatch(tp, records, self.config['message_version']) + future = batch.try_append(timestamp_ms, key, value) if not future: raise Exception() diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index bf7c163..9c36c9b 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -163,7 +163,7 @@ class Sender(threading.Thread): def _failed_produce(self, batches, node_id, error): log.debug("Error sending produce request to node %d: %s", node_id, error) # trace for batch in batches: - self._complete_batch(batch, error, -1) + self._complete_batch(batch, error, -1, None) def _handle_produce_response(self, batches, response): """Handle a produce response.""" @@ -183,15 +183,16 @@ class Sender(threading.Thread): else: # this is the acks = 0 case, just complete all requests for batch in batches: - self._complete_batch(batch, None, -1) + self._complete_batch(batch, None, -1, None) - def _complete_batch(self, batch, error, base_offset): + def _complete_batch(self, batch, error, base_offset, timestamp_ms=None): """Complete or retry the given batch of records. Arguments: batch (RecordBatch): The record batch error (Exception): The error (or None if none) base_offset (int): The base offset assigned to the records if successful + timestamp_ms (int, optional): The timestamp returned by the broker for this batch """ # Standardize no-error to None if error is Errors.NoError: @@ -210,7 +211,7 @@ class Sender(threading.Thread): error = error(batch.topic_partition.topic) # tell the user the result of their request - batch.done(base_offset, error) + batch.done(base_offset, timestamp_ms, error) self._accumulator.deallocate(batch) if getattr(error, 'invalid_metadata', False): diff --git a/kafka/protocol/legacy.py b/kafka/protocol/legacy.py index cd100d6..6ab2511 100644 --- a/kafka/protocol/legacy.py +++ b/kafka/protocol/legacy.py @@ -143,9 +143,11 @@ class KafkaProtocol(object): topic, [( partition, - [(0, 0, kafka.protocol.message.Message(msg.value, key=msg.key, - magic=msg.magic, - attributes=msg.attributes)) + [(0, + kafka.protocol.message.Message( + msg.value, key=msg.key, + magic=msg.magic, attributes=msg.attributes + ).encode()) for msg in payload.messages]) for partition, payload in topic_payloads.items()]) for topic, topic_payloads in group_by_topic_and_partition(payloads).items()]) diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index ae261bf..473ca56 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -1,4 +1,5 @@ import io +import time from ..codec import (has_gzip, has_snappy, has_lz4, gzip_decode, snappy_decode, lz4_decode) @@ -11,22 +12,39 @@ from ..util import crc32 class Message(Struct): - SCHEMA = Schema( - ('crc', Int32), - ('magic', Int8), - ('attributes', Int8), - ('key', Bytes), - ('value', Bytes) - ) - CODEC_MASK = 0x03 + SCHEMAS = [ + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('key', Bytes), + ('value', Bytes)), + Schema( + ('crc', Int32), + ('magic', Int8), + ('attributes', Int8), + ('timestamp', Int64), + ('key', Bytes), + ('value', Bytes)), + ] + SCHEMA = SCHEMAS[1] + CODEC_MASK = 0x07 CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 - HEADER_SIZE = 14 # crc(4), magic(1), attributes(1), key+value size(4*2) + TIMESTAMP_TYPE_MASK = 0x08 + HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) - def __init__(self, value, key=None, magic=0, attributes=0, crc=0): + def __init__(self, value, key=None, magic=0, attributes=0, crc=0, + timestamp=None): assert value is None or isinstance(value, bytes), 'value must be bytes' assert key is None or isinstance(key, bytes), 'key must be bytes' + assert magic > 0 or timestamp is None, 'timestamp not supported in v0' + + # Default timestamp to now for v1 messages + if magic > 0 and timestamp is None: + timestamp = int(time.time() * 1000) + self.timestamp = timestamp self.crc = crc self.magic = magic self.attributes = attributes @@ -34,22 +52,48 @@ class Message(Struct): self.value = value self.encode = self._encode_self + @property + def timestamp_type(self): + """0 for CreateTime; 1 for LogAppendTime; None if unsupported. + + Value is determined by broker; produced messages should always set to 0 + Requires Kafka >= 0.10 / message version >= 1 + """ + if self.magic == 0: + return None + return self.attributes & self.TIMESTAMP_TYPE_MASK + def _encode_self(self, recalc_crc=True): - message = Message.SCHEMA.encode( - (self.crc, self.magic, self.attributes, self.key, self.value) - ) + version = self.magic + if version == 1: + fields = (self.crc, self.magic, self.attributes, self.timestamp, self.key, self.value) + elif version == 0: + fields = (self.crc, self.magic, self.attributes, self.key, self.value) + else: + raise ValueError('Unrecognized message version: %s' % version) + message = Message.SCHEMAS[version].encode(fields) if not recalc_crc: return message self.crc = crc32(message[4:]) - return self.SCHEMA.fields[0].encode(self.crc) + message[4:] + crc_field = self.SCHEMAS[version].fields[0] + return crc_field.encode(self.crc) + message[4:] @classmethod def decode(cls, data): if isinstance(data, bytes): data = io.BytesIO(data) - fields = [field.decode(data) for field in cls.SCHEMA.fields] - return cls(fields[4], key=fields[3], - magic=fields[1], attributes=fields[2], crc=fields[0]) + # Partial decode required to determine message version + base_fields = cls.SCHEMAS[0].fields[0:3] + crc, magic, attributes = [field.decode(data) for field in base_fields] + remaining = cls.SCHEMAS[magic].fields[3:] + fields = [field.decode(data) for field in remaining] + if magic == 1: + timestamp = fields[0] + else: + timestamp = None + return cls(fields[-1], key=fields[-2], + magic=magic, attributes=attributes, crc=crc, + timestamp=timestamp) def validate_crc(self): raw_msg = self._encode_self(recalc_crc=False) @@ -90,8 +134,7 @@ class PartialMessage(bytes): class MessageSet(AbstractType): ITEM = Schema( ('offset', Int64), - ('message_size', Int32), - ('message', Message.SCHEMA) + ('message', Bytes) ) HEADER_SIZE = 12 # offset + message_size @@ -105,20 +148,13 @@ class MessageSet(AbstractType): return items.read(size + 4) encoded_values = [] - for (offset, message_size, message) in items: - if isinstance(message, Message): - encoded_message = message.encode() - else: - encoded_message = cls.ITEM.fields[2].encode(message) - if recalc_message_size: - message_size = len(encoded_message) - encoded_values.append(cls.ITEM.fields[0].encode(offset)) - encoded_values.append(cls.ITEM.fields[1].encode(message_size)) - encoded_values.append(encoded_message) + for (offset, message) in items: + encoded_values.append(Int64.encode(offset)) + encoded_values.append(Bytes.encode(message)) encoded = b''.join(encoded_values) if not size: return encoded - return Int32.encode(len(encoded)) + encoded + return Bytes.encode(encoded) @classmethod def decode(cls, data, bytes_to_read=None): @@ -131,30 +167,18 @@ class MessageSet(AbstractType): bytes_to_read = Int32.decode(data) items = [] - # We need at least 8 + 4 + 14 bytes to read offset + message size + message - # (14 bytes is a message w/ null key and null value) - while bytes_to_read >= 26: - offset = Int64.decode(data) - bytes_to_read -= 8 - - message_size = Int32.decode(data) - bytes_to_read -= 4 - - # if FetchRequest max_bytes is smaller than the available message set - # the server returns partial data for the final message - if message_size > bytes_to_read: + # if FetchRequest max_bytes is smaller than the available message set + # the server returns partial data for the final message + while bytes_to_read: + try: + offset = Int64.decode(data) + msg_bytes = Bytes.decode(data) + bytes_to_read -= 8 + 4 + len(msg_bytes) + items.append((offset, len(msg_bytes), Message.decode(msg_bytes))) + except ValueError: + # PartialMessage to signal that max_bytes may be too small + items.append((None, None, PartialMessage())) break - - message = Message.decode(data) - bytes_to_read -= message_size - - items.append((offset, message_size, message)) - - # If any bytes are left over, clear them from the buffer - # and append a PartialMessage to signal that max_bytes may be too small - if bytes_to_read: - items.append((None, None, PartialMessage(data.read(bytes_to_read)))) - return items @classmethod @@ -164,4 +188,4 @@ class MessageSet(AbstractType): decoded = cls.decode(messages) messages.seek(offset) messages = decoded - return '[' + ', '.join([cls.ITEM.repr(m) for m in messages]) + ']' + return str([cls.ITEM.repr(m) for m in messages]) diff --git a/kafka/protocol/types.py b/kafka/protocol/types.py index 01799bb..18aaca1 100644 --- a/kafka/protocol/types.py +++ b/kafka/protocol/types.py @@ -1,52 +1,63 @@ from __future__ import absolute_import -from struct import pack, unpack +from struct import pack, unpack, error from .abstract import AbstractType +def _pack(f, value): + try: + return pack(f, value) + except error: + raise ValueError(error) + + +def _unpack(f, data): + try: + (value,) = unpack(f, data) + return value + except error: + raise ValueError(error) + + class Int8(AbstractType): @classmethod def encode(cls, value): - return pack('>b', value) + return _pack('>b', value) @classmethod def decode(cls, data): - (value,) = unpack('>b', data.read(1)) - return value + return _unpack('>b', data.read(1)) class Int16(AbstractType): @classmethod def encode(cls, value): - return pack('>h', value) + return _pack('>h', value) @classmethod def decode(cls, data): - (value,) = unpack('>h', data.read(2)) - return value + return _unpack('>h', data.read(2)) class Int32(AbstractType): @classmethod def encode(cls, value): - return pack('>i', value) + return _pack('>i', value) @classmethod def decode(cls, data): - (value,) = unpack('>i', data.read(4)) - return value + return _unpack('>i', data.read(4)) class Int64(AbstractType): @classmethod def encode(cls, value): - return pack('>q', value) + return _pack('>q', value) @classmethod def decode(cls, data): - (value,) = unpack('>q', data.read(8)) - return value + return _unpack('>q', data.read(8)) class String(AbstractType): @@ -63,7 +74,10 @@ class String(AbstractType): length = Int16.decode(data) if length < 0: return None - return data.read(length).decode(self.encoding) + value = data.read(length) + if len(value) != length: + raise ValueError('Buffer underrun decoding string') + return value.decode(self.encoding) class Bytes(AbstractType): @@ -79,7 +93,10 @@ class Bytes(AbstractType): length = Int32.decode(data) if length < 0: return None - return data.read(length) + value = data.read(length) + if len(value) != length: + raise ValueError('Buffer underrun decoding Bytes') + return value class Schema(AbstractType): |