summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-07 23:53:22 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-07 23:54:30 -0700
commit1f5ed3816ca63d7112a942c8dd44d27950730d4e (patch)
tree4173c8b24aee462b2420d7844761413a633f918d
parent2f1aee387e25d7466df3518f7ec73d0649145462 (diff)
downloadkafka-python-1f5ed3816ca63d7112a942c8dd44d27950730d4e.tar.gz
Apply new _get_conn connect logic in KafkaClient.check_version
-rw-r--r--kafka/client_async.py26
1 files changed, 17 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index cfc89fc..8943436 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -598,18 +598,26 @@ class KafkaClient(object):
raise Errors.NoBrokersAvailable()
def connect(node_id):
- timeout_at = time.time() + timeout
- # brokers < 0.9 do not return any broker metadata if there are no topics
- # so we're left with a single bootstrap connection
- while not self.ready(node_id):
- if time.time() >= timeout_at:
- raise Errors.NodeNotReadyError(node_id)
- time.sleep(0.025)
-
+ self._maybe_connect(node_id)
+ conn = self._conns[node_id]
# Monkeypatch the connection request timeout
# Generally this timeout should not get triggered
# but in case it does, we want it to be reasonably short
- self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
+ conn.config['request_timeout_ms'] = timeout * 1000
+ if conn.connected():
+ return
+
+ timeout_at = time.time() + timeout
+ # brokers < 0.9 do not return any broker metadata if there are no topics
+ # so we're left with a single bootstrap connection
+ while time.time() < timeout_at and conn.connecting():
+ if conn.connect() is ConnectionStates.CONNECTED:
+ break
+ else:
+ time.sleep(0.05)
+ else:
+ conn.close()
+ raise Errors.NodeNotReadyError(node_id)
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a