summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/conn.py23
1 files changed, 10 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 4bbd744..e88499c 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -215,9 +215,8 @@ class BrokerConnection(object):
self.host = host
self.port = port
self.afi = afi
- self._sock_ip = host
- self._sock_port = port
self._sock_afi = afi
+ self._sock_addr = None
self.in_flight_requests = collections.deque()
self._api_versions = None
@@ -279,13 +278,12 @@ class BrokerConnection(object):
return False
return True
- def _next_afi_host_port(self):
+ def _next_afi_sockaddr(self):
if not self._gai:
if not self._dns_lookup():
return
afi, _, __, ___, sockaddr = self._gai.pop(0)
- host, port = sockaddr[:2]
- return (afi, host, port)
+ return (afi, sockaddr)
def connect_blocking(self, timeout=float('inf')):
if self.connected():
@@ -327,13 +325,13 @@ class BrokerConnection(object):
"""Attempt to connect and return ConnectionState"""
if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out():
self.last_attempt = time.time()
- next_lookup = self._next_afi_host_port()
+ next_lookup = self._next_afi_sockaddr()
if not next_lookup:
self.close(Errors.ConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
- self._sock_afi, self._sock_ip, self._sock_port = next_lookup
+ self._sock_afi, self._sock_addr = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
@@ -348,9 +346,8 @@ class BrokerConnection(object):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
- self.port, self._sock_ip, self._sock_port,
- AFI_NAMES[self._sock_afi])
+ log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
+ self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -358,7 +355,7 @@ class BrokerConnection(object):
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
- ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
+ ret = self._sock.connect_ex(self._sock_addr)
except socket.error as err:
ret = err.errno
@@ -1009,9 +1006,9 @@ class BrokerConnection(object):
return version
def __str__(self):
- return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % (
self.node_id, self.host, self.port, self.state,
- self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
+ AFI_NAMES[self._sock_afi], self._sock_addr)
class BrokerConnectionMetrics(object):