summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py7
1 files changed, 5 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 96c0647..975202e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -267,9 +267,9 @@ class KafkaClient(object):
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE, conn)
except KeyError:
- self._selector.modify(sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE, conn)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -624,6 +624,9 @@ class KafkaClient(object):
self._clear_wake_fd()
continue
elif not (events & selectors.EVENT_READ):
+ conn = key.data
+ if conn.node_id in self._connecting:
+ self._maybe_connect(conn.node_id)
continue
conn = key.data
processed.add(conn)