summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py13
1 files changed, 12 insertions, 1 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index e1b10b3..0b08415 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -324,8 +324,19 @@ class KafkaClient(object):
def _maybe_connect(self, node_id):
"""Idempotent non-blocking connection attempt to the given node id."""
+ broker = self.cluster.broker_metadata(node_id)
+
+ # If broker metadata indicates that a node's host/port has changed, remove it
+ if node_id in self._conns and broker is not None:
+ conn = self._conns[node_id]
+ host, _, __ = get_ip_port_afi(broker.host)
+ if conn.host != host or conn.port != broker.port:
+ log.debug("Closing connection to decommissioned node %s at %s:%s",
+ node_id, conn.host, conn.port)
+ conn.close()
+ self._conns.pop(node_id)
+
if node_id not in self._conns:
- broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % node_id
log.debug("Initiating connection to node %s at %s:%s",