diff options
-rw-r--r-- | kafka/client_async.py | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index b6adb77..88d2099 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -261,6 +261,7 @@ class KafkaClient(object): return conn.disconnected() and not conn.blacked_out() def _conn_state_change(self, node_id, conn): + close_conns = [] with self._lock: if conn.connecting(): # SSL connections can enter this state 2x (second during Handshake) @@ -295,7 +296,7 @@ class KafkaClient(object): else: for node_id in list(self._conns.keys()): if self.cluster.is_bootstrap(node_id): - self._conns.pop(node_id).close() + close_conns.append(self._conns.pop(node_id)) # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: @@ -321,6 +322,9 @@ class KafkaClient(object): log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() + for conn in close_conns: + conn.close() + def maybe_connect(self, node_id, wakeup=True): """Queues a node for asynchronous connection during the next .poll()""" if self._can_connect(node_id): |