summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-29 17:01:41 -0800
committerDana Powers <dana.powers@rd.io>2015-12-29 17:05:08 -0800
commit2a2733d4fc725f04461a6c4d0ca0fc253f99caeb (patch)
tree440c6e4816a91b5290aabb36d3570e9daab5db43
parentb7d1ed3fb4644c3b255eea356b7de273b522d1f4 (diff)
downloadkafka-python-2a2733d4fc725f04461a6c4d0ca0fc253f99caeb.tar.gz
Improve various docstrings
-rw-r--r--kafka/consumer/subscription_state.py13
-rw-r--r--kafka/coordinator/abstract.py18
-rw-r--r--kafka/coordinator/consumer.py34
3 files changed, 43 insertions, 22 deletions
diff --git a/kafka/consumer/subscription_state.py b/kafka/consumer/subscription_state.py
index a562093..5330e9f 100644
--- a/kafka/consumer/subscription_state.py
+++ b/kafka/consumer/subscription_state.py
@@ -151,9 +151,16 @@ class SubscriptionState(object):
self.needs_partition_assignment = False
def assign_from_subscribed(self, assignments):
- """
- Change the assignment to the specified partitions returned from the coordinator,
- note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs
+ """Update the assignment to the specified partitions
+
+ This method is called by the coordinator to dynamically assign
+ partitions based on the consumer's topic subscription. This is different
+ from assign_from_user() which directly sets the assignment from a
+ user-supplied TopicPartition list.
+
+ Arguments:
+ assignments (list of TopicPartition): partitions to assign to this
+ consumer instance.
"""
if self.subscription is None:
raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE)
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 2dc8269..b0413d5 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -230,7 +230,9 @@ class AbstractCoordinator(object):
This function handles both JoinGroup and SyncGroup, delegating to
_perform_assignment() if elected leader by the coordinator.
- @return Future() of the assignment returned from the group leader
+ Returns:
+ Future: resolves to the encoded-bytes assignment returned from the
+ group leader
"""
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
@@ -323,6 +325,12 @@ class AbstractCoordinator(object):
"""
Perform leader synchronization and send back the assignment
for the group via SyncGroupRequest
+
+ Arguments:
+ response (JoinResponse): broker response to parse
+
+ Returns:
+ Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = self._perform_assignment(response.leader_id,
@@ -391,10 +399,8 @@ class AbstractCoordinator(object):
def _send_group_metadata_request(self):
"""Discover the current coordinator for the group.
- Sends a GroupMetadata request to one of the brokers. The returned future
- should be polled to get the result of the request.
-
- @return future indicating the completion of the metadata request
+ Returns:
+ Future: resolves to the node id of the coordinator
"""
node_id = self._client.least_loaded_node()
if node_id is None or not self._client.ready(node_id):
@@ -477,7 +483,7 @@ class AbstractCoordinator(object):
log.error("LeaveGroup request failed: %s", error_type())
def _send_heartbeat_request(self):
- """Send a heartbeat request now (visible only for testing)."""
+ """Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)
diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py
index ef5d2c6..474c0e0 100644
--- a/kafka/coordinator/consumer.py
+++ b/kafka/coordinator/consumer.py
@@ -217,9 +217,10 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.mark_for_reassignment()
def need_rejoin(self):
- """
- Check whether the group should be rejoined (e.g. if metadata changes)
- @return True if it should, False otherwise
+ """Check whether the group should be rejoined
+
+ Returns:
+ bool: True if consumer should rejoin group, False otherwise
"""
return (self._subscription.partitions_auto_assigned() and
(super(ConsumerCoordinator, self).need_rejoin() or
@@ -236,12 +237,13 @@ class ConsumerCoordinator(AbstractCoordinator):
self._subscription.needs_fetch_committed_offsets = False
def fetch_committed_offsets(self, partitions):
- """
- Fetch the current committed offsets from the coordinator for a set of
- partitions.
+ """Fetch the current committed offsets for specified partitions
- @param partitions The partitions to fetch offsets for
- @return dict of {TopicPartition: OffsetMetadata}
+ Arguments:
+ partitions (list of TopicPartition): partitions to fetch
+
+ Returns:
+ dict: {TopicPartition: OffsetAndMetadata}
"""
while True:
self.ensure_coordinator_known()
@@ -330,9 +332,12 @@ class ConsumerCoordinator(AbstractCoordinator):
polled in the case of a synchronous commit or ignored in the
asynchronous case.
- @param offsets dict of {TopicPartition: OffsetAndMetadata} that should
- be committed
- @return Future indicating whether the commit was successful or not
+ Arguments:
+ offsets (dict of {TopicPartition: OffsetAndMetadata}): what should
+ be committed
+
+ Returns:
+ Future: indicating whether the commit was successful or not
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)
@@ -443,8 +448,11 @@ class ConsumerCoordinator(AbstractCoordinator):
This is a non-blocking call. The returned future can be polled to get
the actual offsets returned from the broker.
- @param partitions list of TopicPartitions
- @return Future of committed offsets dict: {TopicPartition: offset}
+ Arguments:
+ partitions (list of TopicPartition): the partitions to fetch
+
+ Returns:
+ Future: resolves to dict of offsets: {TopicPartition: int}
"""
if self.coordinator_unknown():
return Future().failure(Errors.GroupCoordinatorNotAvailableError)