summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-09-28 15:58:17 -0700
committerDana Powers <dana.powers@gmail.com>2019-09-28 15:58:17 -0700
commit2e6a43839bde28c3eaeeb010bcdfb936ae54730f (patch)
treece7248ca21b3e167b022e1d70e76a21066a0a87e /kafka/client_async.py
parent61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (diff)
downloadkafka-python-connection_delay_inf.tar.gz
Rely on selector to detect completed connection attemptsconnection_delay_inf
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 96c0647..975202e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -267,9 +267,9 @@ class KafkaClient(object):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
- self._selector.modify(sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE, conn)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -624,6 +624,9 @@ class KafkaClient(object):
self._clear_wake_fd()
continue
elif not (events & selectors.EVENT_READ):
+ conn = key.data
+ if conn.node_id in self._connecting:
+ self._maybe_connect(conn.node_id)
continue
conn = key.data
processed.add(conn)