diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 13 |
1 files changed, 12 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index e1b10b3..0b08415 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -324,8 +324,19 @@ class KafkaClient(object): def _maybe_connect(self, node_id): """Idempotent non-blocking connection attempt to the given node id.""" + broker = self.cluster.broker_metadata(node_id) + + # If broker metadata indicates that a node's host/port has changed, remove it + if node_id in self._conns and broker is not None: + conn = self._conns[node_id] + host, _, __ = get_ip_port_afi(broker.host) + if conn.host != host or conn.port != broker.port: + log.debug("Closing connection to decommissioned node %s at %s:%s", + node_id, conn.host, conn.port) + conn.close() + self._conns.pop(node_id) + if node_id not in self._conns: - broker = self.cluster.broker_metadata(node_id) assert broker, 'Broker id %s not in current metadata' % node_id log.debug("Initiating connection to node %s at %s:%s", |