summaryrefslogtreecommitdiff
path: root/kafka/consumer/group.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/consumer/group.py')
-rw-r--r--kafka/consumer/group.py15
1 files changed, 11 insertions, 4 deletions
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index cde956c..89a0292 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -525,7 +525,7 @@ class KafkaConsumer(six.Iterator):
offsets = self._subscription.all_consumed_offsets()
self._coordinator.commit_offsets_sync(offsets)
- def committed(self, partition):
+ def committed(self, partition, metadata=False):
"""Get the last committed offset for the given partition.
This offset will be used as the position for the consumer
@@ -537,9 +537,11 @@ class KafkaConsumer(six.Iterator):
Arguments:
partition (TopicPartition): The partition to check.
+ metadata (bool, optional): If True, return OffsetAndMetadata struct
+ instead of offset int. Default: False.
Returns:
- The last committed offset, or None if there was no prior commit.
+ The last committed offset (int or OffsetAndMetadata), or None if there was no prior commit.
"""
assert self.config['api_version'] >= (0, 8, 1), 'Requires >= Kafka 0.8.1'
assert self.config['group_id'] is not None, 'Requires group_id'
@@ -553,10 +555,15 @@ class KafkaConsumer(six.Iterator):
else:
commit_map = self._coordinator.fetch_committed_offsets([partition])
if partition in commit_map:
- committed = commit_map[partition].offset
+ committed = commit_map[partition]
else:
committed = None
- return committed
+
+ if committed is not None:
+ if metadata:
+ return committed
+ else:
+ return committed.offset
def _fetch_all_topic_metadata(self):
"""A blocking call that fetches topic metadata for all topics in the