diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-04-02 21:27:49 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-02 21:27:49 -0700 |
commit | 91d31494d02ea636a991abb4dfb25dd904eefd45 (patch) | |
tree | 5fa58d3648afc1335cc03facf08255da85416267 /kafka/client_async.py | |
parent | 27cd93be3e7f2e3f3baca04d2126cf3bb6374668 (diff) | |
download | kafka-python-91d31494d02ea636a991abb4dfb25dd904eefd45.tar.gz |
Do not call state_change_callback with lock (#1775)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 16 |
1 files changed, 8 insertions, 8 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index a86ab55..77efac8 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -260,16 +260,16 @@ class KafkaClient(object): conn = self._conns[node_id] return conn.disconnected() and not conn.blacked_out() - def _conn_state_change(self, node_id, conn): + def _conn_state_change(self, node_id, sock, conn): with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) if node_id not in self._connecting: self._connecting.add(node_id) try: - self._selector.register(conn._sock, selectors.EVENT_WRITE) + self._selector.register(sock, selectors.EVENT_WRITE) except KeyError: - self._selector.modify(conn._sock, selectors.EVENT_WRITE) + self._selector.modify(sock, selectors.EVENT_WRITE) if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() @@ -280,9 +280,9 @@ class KafkaClient(object): self._connecting.remove(node_id) try: - self._selector.modify(conn._sock, selectors.EVENT_READ, conn) + self._selector.modify(sock, selectors.EVENT_READ, conn) except KeyError: - self._selector.register(conn._sock, selectors.EVENT_READ, conn) + self._selector.register(sock, selectors.EVENT_READ, conn) if self._sensors: self._sensors.connection_created.record() @@ -298,11 +298,11 @@ class KafkaClient(object): self._conns.pop(node_id).close() # Connection failures imply that our metadata is stale, so let's refresh - elif conn.state is ConnectionStates.DISCONNECTING: + elif conn.state is ConnectionStates.DISCONNECTED: if node_id in self._connecting: self._connecting.remove(node_id) try: - self._selector.unregister(conn._sock) + self._selector.unregister(sock) except KeyError: pass @@ -369,7 +369,7 @@ class KafkaClient(object): log.debug("Initiating connection to node %s at %s:%s", node_id, broker.host, broker.port) host, port, afi = get_ip_port_afi(broker.host) - cb = functools.partial(WeakMethod(self._conn_state_change), node_id) + cb = WeakMethod(self._conn_state_change) conn = BrokerConnection(host, broker.port, afi, state_change_callback=cb, node_id=node_id, |