diff options
Diffstat (limited to 'kafka/coordinator/consumer.py')
-rw-r--r-- | kafka/coordinator/consumer.py | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index ef5d2c6..474c0e0 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -217,9 +217,10 @@ class ConsumerCoordinator(AbstractCoordinator): self._subscription.mark_for_reassignment() def need_rejoin(self): - """ - Check whether the group should be rejoined (e.g. if metadata changes) - @return True if it should, False otherwise + """Check whether the group should be rejoined + + Returns: + bool: True if consumer should rejoin group, False otherwise """ return (self._subscription.partitions_auto_assigned() and (super(ConsumerCoordinator, self).need_rejoin() or @@ -236,12 +237,13 @@ class ConsumerCoordinator(AbstractCoordinator): self._subscription.needs_fetch_committed_offsets = False def fetch_committed_offsets(self, partitions): - """ - Fetch the current committed offsets from the coordinator for a set of - partitions. + """Fetch the current committed offsets for specified partitions - @param partitions The partitions to fetch offsets for - @return dict of {TopicPartition: OffsetMetadata} + Arguments: + partitions (list of TopicPartition): partitions to fetch + + Returns: + dict: {TopicPartition: OffsetAndMetadata} """ while True: self.ensure_coordinator_known() @@ -330,9 +332,12 @@ class ConsumerCoordinator(AbstractCoordinator): polled in the case of a synchronous commit or ignored in the asynchronous case. - @param offsets dict of {TopicPartition: OffsetAndMetadata} that should - be committed - @return Future indicating whether the commit was successful or not + Arguments: + offsets (dict of {TopicPartition: OffsetAndMetadata}): what should + be committed + + Returns: + Future: indicating whether the commit was successful or not """ if self.coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError) @@ -443,8 +448,11 @@ class ConsumerCoordinator(AbstractCoordinator): This is a non-blocking call. The returned future can be polled to get the actual offsets returned from the broker. - @param partitions list of TopicPartitions - @return Future of committed offsets dict: {TopicPartition: offset} + Arguments: + partitions (list of TopicPartition): the partitions to fetch + + Returns: + Future: resolves to dict of offsets: {TopicPartition: int} """ if self.coordinator_unknown(): return Future().failure(Errors.GroupCoordinatorNotAvailableError) |