diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-12-29 12:02:10 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-12-29 12:02:10 -0800 |
commit | 8f8deea7731e26b4cdee3ce5ae8bfdd8e3f57d3a (patch) | |
tree | 40bdde2881753565601a8754870e26f71a164af1 /kafka/consumer/group.py | |
parent | cf28da8420b007d836b0db9d865d74fee89cdbcd (diff) | |
download | kafka-python-issue-1082-offset-and-metadata.tar.gz |
Optionally return OffsetAndMetadata from consumer.committed(tp)issue-1082-offset-and-metadata
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r-- | kafka/consumer/group.py | 15 |
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index cde956c..89a0292 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -525,7 +525,7 @@ class KafkaConsumer(six.Iterator): offsets = self._subscription.all_consumed_offsets() self._coordinator.commit_offsets_sync(offsets) - def committed(self, partition): + def committed(self, partition, metadata=False): """Get the last committed offset for the given partition. This offset will be used as the position for the consumer @@ -537,9 +537,11 @@ class KafkaConsumer(six.Iterator): Arguments: partition (TopicPartition): The partition to check. + metadata (bool, optional): If True, return OffsetAndMetadata struct + instead of offset int. Default: False. Returns: - The last committed offset, or None if there was no prior commit. + The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit. """ assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1' assert self.config['group_id'] is not None, 'Requires group_id' @@ -553,10 +555,15 @@ class KafkaConsumer(six.Iterator): else: commit_map = self._coordinator.fetch_committed_offsets([partition]) if partition in commit_map: - committed = commit_map[partition].offset + committed = commit_map[partition] else: committed = None - return committed + + if committed is not None: + if metadata: + return committed + else: + return committed.offset def _fetch_all_topic_metadata(self): """A blocking call that fetches topic metadata for all topics in the |