summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-07-17 09:40:08 -0700
committerDana Powers <dana.powers@gmail.com>2016-07-17 10:19:50 -0700
commit4be35f5d0d5781cc003b55949b81834c8401abbd (patch)
tree1ae65d702971d6e5a4290401ac74d42b673ae986
parent87648d74f49dafb6146bb61c40d8d2d44146ff8b (diff)
downloadkafka-python-KAFKA-3196.tar.gz
KAFKA-3196: Add checksum and size to RecordMetadata and ConsumerRecordKAFKA-3196
-rw-r--r--kafka/consumer/fetcher.py11
-rw-r--r--kafka/producer/buffer.py2
-rw-r--r--kafka/producer/future.py29
-rw-r--r--kafka/producer/kafka.py10
-rw-r--r--kafka/producer/record_accumulator.py5
5 files changed, 38 insertions, 19 deletions
diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py
index d615848..9fd3457 100644
--- a/kafka/consumer/fetcher.py
+++ b/kafka/consumer/fetcher.py
@@ -20,7 +20,8 @@ log = logging.getLogger(__name__)
ConsumerRecord = collections.namedtuple("ConsumerRecord",
- ["topic", "partition", "offset", "timestamp", "timestamp_type", "key", "value"])
+ ["topic", "partition", "offset", "timestamp", "timestamp_type",
+ "key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
class NoOffsetForPartitionError(Errors.KafkaError):
@@ -410,13 +411,17 @@ class Fetcher(six.Iterator):
key, value = self._deserialize(inner_msg)
yield ConsumerRecord(tp.topic, tp.partition, inner_offset,
inner_timestamp, msg.timestamp_type,
- key, value)
+ key, value, inner_msg.crc,
+ len(inner_msg.key) if inner_msg.key is not None else -1,
+ len(inner_msg.value) if inner_msg.value is not None else -1)
else:
key, value = self._deserialize(msg)
yield ConsumerRecord(tp.topic, tp.partition, offset,
msg.timestamp, msg.timestamp_type,
- key, value)
+ key, value, msg.crc,
+ len(msg.key) if msg.key is not None else -1,
+ len(msg.value) if msg.value is not None else -1)
# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
diff --git a/kafka/producer/buffer.py b/kafka/producer/buffer.py
index de5f0e7..0c49828 100644
--- a/kafka/producer/buffer.py
+++ b/kafka/producer/buffer.py
@@ -59,7 +59,7 @@ class MessageSetBuffer(object):
self._final_size = None
def append(self, offset, message):
- """Apend a Message to the MessageSet.
+ """Append a Message to the MessageSet.
Arguments:
offset (int): offset of the message
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 041e3a2..bc50d0d 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -29,22 +29,29 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset, timestamp_ms):
+ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
- self.relative_offset = relative_offset
- self.timestamp_ms = timestamp_ms
+ # packing args as a tuple is a minor speed optimization
+ self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
def _produce_success(self, offset_and_timestamp):
- offset, timestamp_ms = offset_and_timestamp
- if timestamp_ms is None:
- timestamp_ms = self.timestamp_ms
- if offset != -1 and self.relative_offset is not None:
- offset += self.relative_offset
+ offset, produce_timestamp_ms = offset_and_timestamp
+
+ # Unpacking from args tuple is minor speed optimization
+ (relative_offset, timestamp_ms, checksum,
+ serialized_key_size, serialized_value_size) = self.args
+
+ if produce_timestamp_ms is not None:
+ timestamp_ms = produce_timestamp_ms
+ if offset != -1 and relative_offset is not None:
+ offset += relative_offset
tp = self._produce_future.topic_partition
- metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms)
+ metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
+ checksum, serialized_key_size,
+ serialized_value_size)
self.success(metadata)
def get(self, timeout=None):
@@ -57,4 +64,6 @@ class FutureRecordMetadata(Future):
return self.value
-RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')
+RecordMetadata = collections.namedtuple(
+ 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
+ 'checksum', 'serialized_key_size', 'serialized_value_size'])
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 6db4d13..c4d1a36 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -457,6 +457,7 @@ class KafkaProducer(object):
assert value is not None or self.config['api_version'] >= (0, 8, 1), (
'Null messages require kafka >= 0.8.1')
assert not (value is None and key is None), 'Need at least one: key or value'
+ key_bytes = value_bytes = None
try:
# first make sure the metadata for the topic is
# available
@@ -497,10 +498,11 @@ class KafkaProducer(object):
except Exception as e:
log.debug("Exception occurred during message send: %s", e)
return FutureRecordMetadata(
- FutureProduceResult(
- TopicPartition(topic, partition)),
- -1, None
- ).failure(e)
+ FutureProduceResult(TopicPartition(topic, partition)),
+ -1, None, None,
+ len(key_bytes) if key_bytes is not None else -1,
+ len(value_bytes) if value_bytes is not None else -1
+ ).failure(e)
def flush(self, timeout=None):
"""
diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py
index 7ea579a..0b6fb0a 100644
--- a/kafka/producer/record_accumulator.py
+++ b/kafka/producer/record_accumulator.py
@@ -57,10 +57,13 @@ class RecordBatch(object):
msg = Message(value, key=key, magic=self.message_version)
record_size = self.records.append(self.record_count, msg)
+ checksum = msg.crc # crc is recalculated during records.append()
self.max_record_size = max(self.max_record_size, record_size)
self.last_append = time.time()
future = FutureRecordMetadata(self.produce_future, self.record_count,
- timestamp_ms)
+ timestamp_ms, checksum,
+ len(key) if key is not None else -1,
+ len(value) if value is not None else -1)
self.record_count += 1
return future