summaryrefslogtreecommitdiff
path: root/kafka/producer/future.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/producer/future.py')
-rw-r--r--kafka/producer/future.py29
1 files changed, 19 insertions, 10 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py
index 041e3a2..bc50d0d 100644
--- a/kafka/producer/future.py
+++ b/kafka/producer/future.py
@@ -29,22 +29,29 @@ class FutureProduceResult(Future):
class FutureRecordMetadata(Future):
- def __init__(self, produce_future, relative_offset, timestamp_ms):
+ def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
super(FutureRecordMetadata, self).__init__()
self._produce_future = produce_future
- self.relative_offset = relative_offset
- self.timestamp_ms = timestamp_ms
+ # packing args as a tuple is a minor speed optimization
+ self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
produce_future.add_callback(self._produce_success)
produce_future.add_errback(self.failure)
def _produce_success(self, offset_and_timestamp):
- offset, timestamp_ms = offset_and_timestamp
- if timestamp_ms is None:
- timestamp_ms = self.timestamp_ms
- if offset != -1 and self.relative_offset is not None:
- offset += self.relative_offset
+ offset, produce_timestamp_ms = offset_and_timestamp
+
+ # Unpacking from args tuple is minor speed optimization
+ (relative_offset, timestamp_ms, checksum,
+ serialized_key_size, serialized_value_size) = self.args
+
+ if produce_timestamp_ms is not None:
+ timestamp_ms = produce_timestamp_ms
+ if offset != -1 and relative_offset is not None:
+ offset += relative_offset
tp = self._produce_future.topic_partition
- metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms)
+ metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
+ checksum, serialized_key_size,
+ serialized_value_size)
self.success(metadata)
def get(self, timeout=None):
@@ -57,4 +64,6 @@ class FutureRecordMetadata(Future):
return self.value
-RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp')
+RecordMetadata = collections.namedtuple(
+ 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
+ 'checksum', 'serialized_key_size', 'serialized_value_size'])