summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-04-02 21:27:49 -0700
committerGitHub <noreply@github.com>2019-04-02 21:27:49 -0700
commit91d31494d02ea636a991abb4dfb25dd904eefd45 (patch)
tree5fa58d3648afc1335cc03facf08255da85416267 /kafka/client_async.py
parent27cd93be3e7f2e3f3baca04d2126cf3bb6374668 (diff)
downloadkafka-python-91d31494d02ea636a991abb4dfb25dd904eefd45.tar.gz
Do not call state_change_callback with lock (#1775)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py16
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index a86ab55..77efac8 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -260,16 +260,16 @@ class KafkaClient(object):
conn = self._conns[node_id]
return conn.disconnected() and not conn.blacked_out()
- def _conn_state_change(self, node_id, conn):
+ def _conn_state_change(self, node_id, sock, conn):
with self._lock:
if conn.connecting():
# SSL connections can enter this state 2x (second during Handshake)
if node_id not in self._connecting:
self._connecting.add(node_id)
try:
- self._selector.register(conn._sock, selectors.EVENT_WRITE)
+ self._selector.register(sock, selectors.EVENT_WRITE)
except KeyError:
- self._selector.modify(conn._sock, selectors.EVENT_WRITE)
+ self._selector.modify(sock, selectors.EVENT_WRITE)
if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
@@ -280,9 +280,9 @@ class KafkaClient(object):
self._connecting.remove(node_id)
try:
- self._selector.modify(conn._sock, selectors.EVENT_READ, conn)
+ self._selector.modify(sock, selectors.EVENT_READ, conn)
except KeyError:
- self._selector.register(conn._sock, selectors.EVENT_READ, conn)
+ self._selector.register(sock, selectors.EVENT_READ, conn)
if self._sensors:
self._sensors.connection_created.record()
@@ -298,11 +298,11 @@ class KafkaClient(object):
self._conns.pop(node_id).close()
# Connection failures imply that our metadata is stale, so let's refresh
- elif conn.state is ConnectionStates.DISCONNECTING:
+ elif conn.state is ConnectionStates.DISCONNECTED:
if node_id in self._connecting:
self._connecting.remove(node_id)
try:
- self._selector.unregister(conn._sock)
+ self._selector.unregister(sock)
except KeyError:
pass
@@ -369,7 +369,7 @@ class KafkaClient(object):
log.debug("Initiating connection to node %s at %s:%s",
node_id, broker.host, broker.port)
host, port, afi = get_ip_port_afi(broker.host)
- cb = functools.partial(WeakMethod(self._conn_state_change), node_id)
+ cb = WeakMethod(self._conn_state_change)
conn = BrokerConnection(host, broker.port, afi,
state_change_callback=cb,
node_id=node_id,