summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/coordinator/base.py23
1 files changed, 23 insertions, 0 deletions
diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py
index 68b1bda..ab259dd 100644
--- a/kafka/coordinator/base.py
+++ b/kafka/coordinator/base.py
@@ -286,6 +286,10 @@ class BaseCoordinator(object):
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)
+ elif not self._client.ready(self.coordinator_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(self.coordinator_id)
+ return Future().failure(e)
+
# send a join group request to the coordinator
log.info("(Re-)joining group %s", self.group_id)
request = JoinGroupRequest[0](
@@ -416,6 +420,13 @@ class BaseCoordinator(object):
if self.coordinator_unknown():
e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
return Future().failure(e)
+
+ # We assume that coordinator is ready if we're sending SyncGroup
+ # as it typically follows a successful JoinGroup
+ # Also note that if client.ready() enforces a metadata priority policy,
+ # we can get into an infinite loop if the leader assignment process
+ # itself requests a metadata update
+
future = Future()
_f = self._client.send(self.coordinator_id, request)
_f.add_callback(self._handle_sync_group_response, future, time.time())
@@ -467,6 +478,10 @@ class BaseCoordinator(object):
if node_id is None:
return Future().failure(Errors.NoBrokersAvailable())
+ elif not self._client.ready(node_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(node_id)
+ return Future().failure(e)
+
log.debug("Sending group coordinator request for group %s to broker %s",
self.group_id, node_id)
request = GroupCoordinatorRequest[0](self.group_id)
@@ -553,6 +568,14 @@ class BaseCoordinator(object):
def _send_heartbeat_request(self):
"""Send a heartbeat request"""
+ if self.coordinator_unknown():
+ e = Errors.GroupCoordinatorNotAvailableError(self.coordinator_id)
+ return Future().failure(e)
+
+ elif not self._client.ready(self.coordinator_id, metadata_priority=False):
+ e = Errors.NodeNotReadyError(self.coordinator_id)
+ return Future().failure(e)
+
request = HeartbeatRequest[0](self.group_id, self.generation, self.member_id)
log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member
future = Future()