summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py6
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index b6adb77..88d2099 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -261,6 +261,7 @@ class KafkaClient(object):
return conn.disconnected() and not conn.blacked_out()
def _conn_state_change(self, node_id, conn):
+ close_conns = []
with self._lock:
if conn.connecting():
# SSL connections can enter this state 2x (second during Handshake)
@@ -295,7 +296,7 @@ class KafkaClient(object):
else:
for node_id in list(self._conns.keys()):
if self.cluster.is_bootstrap(node_id):
- self._conns.pop(node_id).close()
+ close_conns.append(self._conns.pop(node_id))
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -321,6 +322,9 @@ class KafkaClient(object):
log.warning("Node %s connection failed -- refreshing metadata", node_id)
self.cluster.request_update()
+ for conn in close_conns:
+ conn.close()
+
def maybe_connect(self, node_id, wakeup=True):
"""Queues a node for asynchronous connection during the next .poll()"""
if self._can_connect(node_id):