diff options
-rw-r--r-- | kafka/coordinator/consumer.py | 35 |
1 files changed, 19 insertions, 16 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 92a840d..f7e55f6 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -272,13 +272,16 @@ class ConsumerCoordinator(AbstractCoordinator): super(ConsumerCoordinator, self).close() def commit_offsets_async(self, offsets, callback=None): - """ - @param offsets: dict of {TopicPartition: OffsetAndMetadata} to commit - @param callback: called as callback(offsets, response), with response - as either an Exception or a OffsetCommitResponse - struct. This callback can be used to trigger custom - actions when a commit request completes. - @returns Future + """Commit specific offsets asynchronously. + + Arguments: + offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit + callback (callable, optional): called as callback(offsets, response) + response will be either an Exception or a OffsetCommitResponse + struct. This callback can be used to trigger custom actions when + a commit request completes. + Returns: + Future: indicating whether the commit was successful or not """ self._subscription.needs_fetch_committed_offsets = True future = self._send_offset_commit_request(offsets) @@ -286,15 +289,15 @@ class ConsumerCoordinator(AbstractCoordinator): future.add_both(cb, offsets) def commit_offsets_sync(self, offsets): - """ - Commit offsets synchronously. This method will retry until the commit - completes successfully or an unrecoverable error is encountered. - - @param offsets dict of {TopicPartition: OffsetAndMetadata} to commit - @raises TopicAuthorizationError if the consumer is not authorized to the - group or to any of the specified partitions - @raises CommitFailedError if an unrecoverable error occurs before the - commit can be completed + """Commit specific offsets synchronously. + + This method will retry until the commit completes successfully or an + unrecoverable error is encountered. + + Arguments: + offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit + + Raises error on failure """ if not offsets: return |