summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorEvan Bender <evan.bender@percolate.com>2016-08-23 11:47:13 -0400
committerDana Powers <dana.powers@gmail.com>2016-11-20 15:15:14 -0800
commitcbe8a6a2ee9c3a054a7bbfeebc4d5f6b6c892943 (patch)
treead462fcfd81933aad794d484449dd92cf09df501 /kafka/conn.py
parentc4a6e1aa68fc48dd589ff64e1247d2886ccfa3fd (diff)
downloadkafka-python-cbe8a6a2ee9c3a054a7bbfeebc4d5f6b6c892943.tar.gz
When hostname lookup is necessary, do every connect (#812)
Fixes a bug where lookup was done only once for the whole life of the process -- if a broker's IP changed, client couldn't reconnect.
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 21607d9..a8751e9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -157,6 +157,9 @@ class BrokerConnection(object):
self.hostname = host
self.port = port
self.afi = afi
+ self._init_host = host
+ self._init_port = port
+ self._init_afi = afi
self.in_flight_requests = collections.deque()
self.config = copy.copy(self.DEFAULT_CONFIG)
@@ -208,7 +211,7 @@ class BrokerConnection(object):
log.debug('%s: creating new socket', str(self))
# if self.afi is set to AF_UNSPEC, then we need to do a name
# resolution and try all available address families
- if self.afi == socket.AF_UNSPEC:
+ if self._init_afi == socket.AF_UNSPEC:
if self._gai is None:
# XXX: all DNS functions in Python are blocking. If we really
# want to be non-blocking here, we need to use a 3rd-party
@@ -216,14 +219,15 @@ class BrokerConnection(object):
# own thread. This will be subject to the default libc
# name resolution timeout (5s on most Linux boxes)
try:
- self._gai = socket.getaddrinfo(self.host, self.port,
+ self._gai = socket.getaddrinfo(self._init_host,
+ self._init_port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)
except socket.gaierror as ex:
raise socket.gaierror('getaddrinfo failed for {0}:{1}, '
'exception was {2}. Is your advertised.listeners (called'
'advertised.host.name before Kafka 9) correct and resolvable?'.format(
- self.host, self.port, ex
+ self._init_host, self._init_port, ex
))
self._gai_index = 0
else:
@@ -233,7 +237,7 @@ class BrokerConnection(object):
while True:
if self._gai_index >= len(self._gai):
log.error('Unable to connect to any of the names for {0}:{1}'.format(
- self.host, self.port
+ self._init_host, self._init_port
))
self.close()
return
@@ -245,7 +249,7 @@ class BrokerConnection(object):
self.host, self.port = sockaddr[:2]
self._sock = socket.socket(afi, socket.SOCK_STREAM)
else:
- self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
+ self._sock = socket.socket(self._init_afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
self._sock.setsockopt(*option)