diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-07 23:53:22 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-07 23:54:30 -0700 |
commit | 1f5ed3816ca63d7112a942c8dd44d27950730d4e (patch) | |
tree | 4173c8b24aee462b2420d7844761413a633f918d | |
parent | 2f1aee387e25d7466df3518f7ec73d0649145462 (diff) | |
download | kafka-python-1f5ed3816ca63d7112a942c8dd44d27950730d4e.tar.gz |
Apply new _get_conn connect logic in KafkaClient.check_version
-rw-r--r-- | kafka/client_async.py | 26 |
1 files changed, 17 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..8943436 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -598,18 +598,26 @@ class KafkaClient(object): raise Errors.NoBrokersAvailable() def connect(node_id): - timeout_at = time.time() + timeout - # brokers < 0.9 do not return any broker metadata if there are no topics - # so we're left with a single bootstrap connection - while not self.ready(node_id): - if time.time() >= timeout_at: - raise Errors.NodeNotReadyError(node_id) - time.sleep(0.025) - + self._maybe_connect(node_id) + conn = self._conns[node_id] # Monkeypatch the connection request timeout # Generally this timeout should not get triggered # but in case it does, we want it to be reasonably short - self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 + conn.config['request_timeout_ms'] = timeout * 1000 + if conn.connected(): + return + + timeout_at = time.time() + timeout + # brokers < 0.9 do not return any broker metadata if there are no topics + # so we're left with a single bootstrap connection + while time.time() < timeout_at and conn.connecting(): + if conn.connect() is ConnectionStates.CONNECTED: + break + else: + time.sleep(0.05) + else: + conn.close() + raise Errors.NodeNotReadyError(node_id) # kafka kills the connection when it doesnt recognize an API request # so we can send a test request and then follow immediately with a |