diff options
Diffstat (limited to 'kafka/producer/future.py')
-rw-r--r-- | kafka/producer/future.py | 29 |
1 files changed, 19 insertions, 10 deletions
diff --git a/kafka/producer/future.py b/kafka/producer/future.py index 041e3a2..bc50d0d 100644 --- a/kafka/producer/future.py +++ b/kafka/producer/future.py @@ -29,22 +29,29 @@ class FutureProduceResult(Future): class FutureRecordMetadata(Future): - def __init__(self, produce_future, relative_offset, timestamp_ms): + def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size): super(FutureRecordMetadata, self).__init__() self._produce_future = produce_future - self.relative_offset = relative_offset - self.timestamp_ms = timestamp_ms + # packing args as a tuple is a minor speed optimization + self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size) produce_future.add_callback(self._produce_success) produce_future.add_errback(self.failure) def _produce_success(self, offset_and_timestamp): - offset, timestamp_ms = offset_and_timestamp - if timestamp_ms is None: - timestamp_ms = self.timestamp_ms - if offset != -1 and self.relative_offset is not None: - offset += self.relative_offset + offset, produce_timestamp_ms = offset_and_timestamp + + # Unpacking from args tuple is minor speed optimization + (relative_offset, timestamp_ms, checksum, + serialized_key_size, serialized_value_size) = self.args + + if produce_timestamp_ms is not None: + timestamp_ms = produce_timestamp_ms + if offset != -1 and relative_offset is not None: + offset += relative_offset tp = self._produce_future.topic_partition - metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms) + metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms, + checksum, serialized_key_size, + serialized_value_size) self.success(metadata) def get(self, timeout=None): @@ -57,4 +64,6 @@ class FutureRecordMetadata(Future): return self.value -RecordMetadata = collections.namedtuple('RecordMetadata', 'topic partition topic_partition offset timestamp') +RecordMetadata = collections.namedtuple( + 'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp', + 'checksum', 'serialized_key_size', 'serialized_value_size']) |