diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e51e3d4..e921fa4 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -163,21 +163,29 @@ class KafkaClient(object): host, port, afi = get_ip_port_afi(broker.host) self._conns[node_id] = BrokerConnection(host, broker.port, afi, **self.config) - state = self._conns[node_id].connect() - if self._conns[node_id].connecting(): - self._connecting.add(node_id) + conn = self._conns[node_id] + if conn.connected(): + return True + + conn.connect() + + if conn.connecting(): + if node_id not in self._connecting: + self._connecting.add(node_id) # Whether CONNECTED or DISCONNECTED, we need to remove from connecting elif node_id in self._connecting: - log.debug("Node %s connection state is %s", node_id, state) self._connecting.remove(node_id) + if conn.connected(): + log.debug("Node %s connected", node_id) + # Connection failures imply that our metadata is stale, so let's refresh - if state is ConnectionStates.DISCONNECTED: + elif conn.disconnected(): log.warning("Node %s connect failed -- refreshing metadata", node_id) self.cluster.request_update() - return self._conns[node_id].connected() + return conn.connected() def ready(self, node_id): """Check whether a node is connected and ok to send more requests. @@ -228,7 +236,7 @@ class KafkaClient(object): """ if node_id not in self._conns: return False - return self._conns[node_id].state is ConnectionStates.DISCONNECTED + return self._conns[node_id].disconnected() def connection_delay(self, node_id): """ |