summaryrefslogtreecommitdiff
path: root/kafka/coordinator/consumer.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r--kafka/coordinator/consumer.py34
1 files changed, 21 insertions, 13 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ef5d2c6..474c0e0 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -217,9 +217,10 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.mark_for_reassignment()
def need_rejoin(self):
- """
- Check whether the group should be rejoined (e.g. if metadata changes)
- @return True if it should, False otherwise
+ """Check whether the group should be rejoined
+
+ Returns:
+ bool: True if consumer should rejoin group, False otherwise
"""
return (self._subscription.partitions_auto_assigned() and
(super(ConsumerCoordinator, self).need_rejoin() or
@@ -236,12 +237,13 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.needs_fetch_committed_offsets = False
def fetch_committed_offsets(self, partitions):
- """
- Fetch the current committed offsets from the coordinator for a set of
- partitions.
+ """Fetch the current committed offsets for specified partitions
- @param partitions The partitions to fetch offsets for
- @return dict of {TopicPartition: OffsetMetadata}
+ Arguments:
+ partitions (list of TopicPartition): partitions to fetch
+
+ Returns:
+ dict: {TopicPartition: OffsetAndMetadata}
"""
while True:
self.ensure_coordinator_known()
@@ -330,9 +332,12 @@ class ConsumerCoordinator(AbstractCoordinator):
polled in the case of a synchronous commit or ignored in the
asynchronous case.
- @param offsets dict of {TopicPartition: OffsetAndMetadata} that should
- be committed
- @return Future indicating whether the commit was successful or not
+ Arguments:
+ offsets (dict of {TopicPartition: OffsetAndMetadata}): what should
+ be committed
+
+ Returns:
+ Future: indicating whether the commit was successful or not
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -443,8 +448,11 @@ class ConsumerCoordinator(AbstractCoordinator):
This is a non-blocking call. The returned future can be polled to get
the actual offsets returned from the broker.
- @param partitions list of TopicPartitions
- @return Future of committed offsets dict: {TopicPartition: offset}
+ Arguments:
+ partitions (list of TopicPartition): the partitions to fetch
+
+ Returns:
+ Future: resolves to dict of offsets: {TopicPartition: int}
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)