summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py30
1 files changed, 22 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index c1bdd82..d608e6a 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -304,7 +304,10 @@ class KafkaClient(object):
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
+ try:
self._selector.register(conn._sock, selectors.EVENT_WRITE)
+ except KeyError:
+ self._selector.modify(conn._sock, selectors.EVENT_WRITE)
elif conn.connected():
log.debug("Node %s connected", node_id)
@@ -312,10 +315,10 @@ class KafkaClient(object):
self._connecting.remove(node_id)
try:
- self._selector.unregister(conn._sock)
+ self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
except KeyError:
- pass
- self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+ self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+
if self._sensors:
self._sensors.connection_created.record()
@@ -336,6 +339,7 @@ class KafkaClient(object):
self._selector.unregister(conn._sock)
except KeyError:
pass
+
if self._sensors:
self._sensors.connection_closed.record()
@@ -348,6 +352,17 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
+ def maybe_connect(self, node_id):
+ """Queues a node for asynchronous connection during the next .poll()"""
+ if self._can_connect(node_id):
+ self._connecting.add(node_id)
+ # Wakeup signal is useful in case another thread is
+ # blocked waiting for incoming network traffic while holding
+ # the client lock in poll().
+ self.wakeup()
+ return True
+ return False
+
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
with self._lock:
@@ -397,7 +412,7 @@ class KafkaClient(object):
Returns:
bool: True if we are ready to send to the given node
"""
- self._maybe_connect(node_id)
+ self.maybe_connect(node_id)
return self.is_ready(node_id, metadata_priority=metadata_priority)
def connected(self, node_id):
@@ -520,8 +535,8 @@ class KafkaClient(object):
Future: resolves to Response struct or Error
"""
if not self._can_send_request(node_id):
- if not self._maybe_connect(node_id):
- return Future().failure(Errors.NodeNotReadyError(node_id))
+ self.maybe_connect(node_id)
+ return Future().failure(Errors.NodeNotReadyError(node_id))
# conn.send will queue the request internally
# we will need to call send_pending_requests()
@@ -814,9 +829,8 @@ class KafkaClient(object):
# have such application level configuration, using request timeout instead.
return self.config['request_timeout_ms']
- if self._can_connect(node_id):
+ if self.maybe_connect(node_id):
log.debug("Initializing connection to node %s for metadata request", node_id)
- self._maybe_connect(node_id)
return self.config['reconnect_backoff_ms']
# connected but can't send more, OR connecting