summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-31 10:10:32 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-31 16:46:25 -0700
commita6298c712a7a3c866f15c471413a73e9d1e250c0 (patch)
tree2ac5f822b406628b13b0709671051743ef644aee
parentb1effa24aca3a6bcf2268354caae12ee82d6b36d (diff)
downloadkafka-python-state_change_callback_no_lock_close.tar.gz
Dont call conn.close() with client _lock in state change callbackstate_change_callback_no_lock_close
-rw-r--r--kafka/client_async.py6
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..88d2099 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -261,6 +261,7 @@ class KafkaClient(object):
return conn.disconnected() and not conn.blacked_out()
def _conn_state_change(self, node_id, conn):
+ close_conns = []
with self._lock:
if conn.connecting():
# SSL connections can enter this state 2x (second during Handshake)
@@ -295,7 +296,7 @@ class KafkaClient(object):
else:
for node_id in list(self._conns.keys()):
if self.cluster.is_bootstrap(node_id):
- self._conns.pop(node_id).close()
+ close_conns.append(self._conns.pop(node_id))
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -321,6 +322,9 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
+ for conn in close_conns:
+ conn.close()
+
def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):