summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-12-29 16:06:24 -0800
committerGitHub <noreply@github.com>2019-12-29 16:06:24 -0800
commit2a86b23f477e5ed57aa987db97d11284a37d05a0 (patch)
tree391d4d2d99db38eb2f9869d76592851bbae57cf2 /kafka/coordinator/consumer.py
parent1a91a54688cb77fd77c342e719f24f346d5cee89 (diff)
downloadkafka-python-2a86b23f477e5ed57aa987db97d11284a37d05a0.tar.gz
Optionally return OffsetAndMetadata from consumer.committed(tp) (#1979)
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py6
1 files changed, 3 insertions, 3 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 30337c3..fda80aa 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -387,7 +387,7 @@ class ConsumerCoordinator(BaseCoordinator):
for partition, offset in six.iteritems(offsets):
# verify assignment is still active
if self._subscription.is_assigned(partition):
- self._subscription.assignment[partition].committed = offset.offset
+ self._subscription.assignment[partition].committed = offset
self._subscription.needs_fetch_committed_offsets = False
def fetch_committed_offsets(self, partitions):
@@ -641,7 +641,7 @@ class ConsumerCoordinator(BaseCoordinator):
log.debug("Group %s committed offset %s for partition %s",
self.group_id, offset, tp)
if self._subscription.is_assigned(tp):
- self._subscription.assignment[tp].committed = offset.offset
+ self._subscription.assignment[tp].committed = offset
elif error_type is Errors.GroupAuthorizationFailedError:
log.error("Not authorized to commit offsets for group %s",
self.group_id)
@@ -704,7 +704,7 @@ class ConsumerCoordinator(BaseCoordinator):
partitions (list of TopicPartition): the partitions to fetch
Returns:
- Future: resolves to dict of offsets: {TopicPartition: int}
+ Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
"""
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))