summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/consumer.py35
1 files changed, 19 insertions, 16 deletions
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index 92a840d..f7e55f6 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -272,13 +272,16 @@ class ConsumerCoordinator(AbstractCoordinator):
super(ConsumerCoordinator, self).close()
def commit_offsets_async(self, offsets, callback=None):
- """
- @param offsets: dict of {TopicPartition: OffsetAndMetadata} to commit
- @param callback: called as callback(offsets, response), with response
- as either an Exception or a OffsetCommitResponse
- struct. This callback can be used to trigger custom
- actions when a commit request completes.
- @returns Future
+ """Commit specific offsets asynchronously.
+
+ Arguments:
+ offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
+ callback (callable, optional): called as callback(offsets, response)
+ response will be either an Exception or a OffsetCommitResponse
+ struct. This callback can be used to trigger custom actions when
+ a commit request completes.
+ Returns:
+ Future: indicating whether the commit was successful or not
"""
self._subscription.needs_fetch_committed_offsets = True
future = self._send_offset_commit_request(offsets)
@@ -286,15 +289,15 @@ class ConsumerCoordinator(AbstractCoordinator):
future.add_both(cb, offsets)
def commit_offsets_sync(self, offsets):
- """
- Commit offsets synchronously. This method will retry until the commit
- completes successfully or an unrecoverable error is encountered.
-
- @param offsets dict of {TopicPartition: OffsetAndMetadata} to commit
- @raises TopicAuthorizationError if the consumer is not authorized to the
- group or to any of the specified partitions
- @raises CommitFailedError if an unrecoverable error occurs before the
- commit can be completed
+ """Commit specific offsets synchronously.
+
+ This method will retry until the commit completes successfully or an
+ unrecoverable error is encountered.
+
+ Arguments:
+ offsets (dict {TopicPartition: OffsetAndMetadata}): what to commit
+
+ Raises error on failure
"""
if not offsets:
return