summaryrefslogtreecommitdiff
path: root/kafka/coordinator/abstract.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/coordinator/abstract.py')
-rw-r--r--kafka/coordinator/abstract.py18
1 files changed, 12 insertions, 6 deletions
diff --git a/kafka/coordinator/abstract.py b/kafka/coordinator/abstract.py
index 2dc8269..b0413d5 100644
--- a/kafka/coordinator/abstract.py
+++ b/kafka/coordinator/abstract.py
@@ -230,7 +230,9 @@ class AbstractCoordinator(object):
This function handles both JoinGroup and SyncGroup, delegating to
_perform_assignment() if elected leader by the coordinator.
- @return Future() of the assignment returned from the group leader
+ Returns:
+ Future: resolves to the encoded-bytes assignment returned from the
+ group leader
"""
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
@@ -323,6 +325,12 @@ class AbstractCoordinator(object):
"""
Perform leader synchronization and send back the assignment
for the group via SyncGroupRequest
+
+ Arguments:
+ response (JoinResponse): broker response to parse
+
+ Returns:
+ Future: resolves to member assignment encoded-bytes
"""
try:
group_assignment = self._perform_assignment(response.leader_id,
@@ -391,10 +399,8 @@ class AbstractCoordinator(object):
def _send_group_metadata_request(self):
"""Discover the current coordinator for the group.
- Sends a GroupMetadata request to one of the brokers. The returned future
- should be polled to get the result of the request.
-
- @return future indicating the completion of the metadata request
+ Returns:
+ Future: resolves to the node id of the coordinator
"""
node_id = self._client.least_loaded_node()
if node_id is None or not self._client.ready(node_id):
@@ -477,7 +483,7 @@ class AbstractCoordinator(object):
log.error("LeaveGroup request failed: %s", error_type())
def _send_heartbeat_request(self):
- """Send a heartbeat request now (visible only for testing)."""
+ """Send a heartbeat request"""
request = HeartbeatRequest(self.group_id, self.generation, self.member_id)
future = Future()
_f = self._client.send(self.coordinator_id, request)