diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-17 09:40:08 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-07-17 10:19:50 -0700 |
commit | 4be35f5d0d5781cc003b55949b81834c8401abbd (patch) | |
tree | 1ae65d702971d6e5a4290401ac74d42b673ae986 | |
parent | 87648d74f49dafb6146bb61c40d8d2d44146ff8b (diff) | |
download | kafka-python-KAFKA-3196.tar.gz |
KAFKA-3196: Add checksum and size to RecordMetadata and ConsumerRecordKAFKA-3196
-rw-r--r-- | kafka/consumer/fetcher.py | 11 | ||||
-rw-r--r-- | kafka/producer/buffer.py | 2 | ||||
-rw-r--r-- | kafka/producer/future.py | 29 | ||||
-rw-r--r-- | kafka/producer/kafka.py | 10 | ||||
-rw-r--r-- | kafka/producer/record_accumulator.py | 5 |
5 files changed, 38 insertions, 19 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index d615848..9fd3457 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -20,7 +20,8 @@ log = logging.getLogger(__name__) ConsumerRecord = collections.namedtuple("ConsumerRecord", - ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"]) + ["topic", "partition", "offset", "timestamp", "timestamp_type", + "key", "value", "checksum", "serialized_key_size", "serialized_value_size"]) class NoOffsetForPartitionError(Errors.KafkaError): @@ -410,13 +411,17 @@ class Fetcher(six.Iterator): key, value = self._deserialize(inner_msg) yield ConsumerRecord(tp.topic, tp.partition, inner_offset, inner_timestamp, msg.timestamp_type, - key, value) + key, value, inner_msg.crc, + len(inner_msg.key) if inner_msg.key is not None else -1, + len(inner_msg.value) if inner_msg.value is not None else -1) else: key, value = self._deserialize(msg) yield ConsumerRecord(tp.topic, tp.partition, offset, msg.timestamp, msg.timestamp_type, - key, value) + key, value, msg.crc, + len(msg.key) if msg.key is not None else -1, + len(msg.value) if msg.value is not None else -1) # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py index de5f0e7..0c49828 100644 --- a/kafka/producer/buffer.py +++ b/kafka/producer/buffer.py @@ -59,7 +59,7 @@ class MessageSetBuffer(object): self._final_size = None def append(self, offset, message): - """Apend a Message to the MessageSet. + """Append a Message to the MessageSet. Arguments: offset (int): offset of the message diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 041e3a2..bc50d0d 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,22 +29,29 @@ class FutureProduceResult(Future): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset, timestamp_ms): + def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future - self.relative_offset = relative_offset - self.timestamp_ms = timestamp_ms + # packing args as a tuple is a minor speed optimization + self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size) produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, timestamp_ms = offset_and_timestamp - if timestamp_ms is None: - timestamp_ms = self.timestamp_ms - if offset != -1 and self.relative_offset is not None: - offset += self.relative_offset + offset, produce_timestamp_ms = offset_and_timestamp + + # Unpacking from args tuple is minor speed optimization + (relative_offset, timestamp_ms, checksum, + serialized_key_size, serialized_value_size) = self.args + + if produce_timestamp_ms is not None: + timestamp_ms = produce_timestamp_ms + if offset != -1 and relative_offset is not None: + offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms) + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + checksum, serialized_key_size, + serialized_value_size) self.success(metadata) def get(self, timeout=None): @@ -57,4 +64,6 @@ class FutureRecordMetadata(Future): return self.value -RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp') +RecordMetadata = collections.namedtuple( + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', + 'checksum', 'serialized_key_size', 'serialized_value_size']) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 6db4d13..c4d1a36 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -457,6 +457,7 @@ class KafkaProducer(object): assert value is not None or self.config['api_version'] >= (0, 8, 1), ( 'Null messages require kafka >= 0.8.1') assert not (value is None and key is None), 'Need at least one: key or value' + key_bytes = value_bytes = None try: # first make sure the metadata for the topic is # available @@ -497,10 +498,11 @@ class KafkaProducer(object): except Exception as e: log.debug("Exception occurred during message send: %s", e) return FutureRecordMetadata( - FutureProduceResult( - TopicPartition(topic, partition)), - -1, None - ).failure(e) + FutureProduceResult(TopicPartition(topic, partition)), + -1, None, None, + len(key_bytes) if key_bytes is not None else -1, + len(value_bytes) if value_bytes is not None else -1 + ).failure(e) def flush(self, timeout=None): """ diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 7ea579a..0b6fb0a 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -57,10 +57,13 @@ class RecordBatch(object): msg = Message(value, key=key, magic=self.message_version) record_size = self.records.append(self.record_count, msg) + checksum = msg.crc # crc is recalculated during records.append() self.max_record_size = max(self.max_record_size, record_size) self.last_append = time.time() future = FutureRecordMetadata(self.produce_future, self.record_count, - timestamp_ms) + timestamp_ms, checksum, + len(key) if key is not None else -1, + len(value) if value is not None else -1) self.record_count += 1 return future |