diff options
Diffstat (limited to 'kafka')
-rw-r--r-- | kafka/conn.py | 23 |
1 files changed, 10 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 4bbd744..e88499c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -215,9 +215,8 @@ class BrokerConnection(object): self.host = host self.port = port self.afi = afi - self._sock_ip = host - self._sock_port = port self._sock_afi = afi + self._sock_addr = None self.in_flight_requests = collections.deque() self._api_versions = None @@ -279,13 +278,12 @@ class BrokerConnection(object): return False return True - def _next_afi_host_port(self): + def _next_afi_sockaddr(self): if not self._gai: if not self._dns_lookup(): return afi, _, __, ___, sockaddr = self._gai.pop(0) - host, port = sockaddr[:2] - return (afi, host, port) + return (afi, sockaddr) def connect_blocking(self, timeout=float('inf')): if self.connected(): @@ -327,13 +325,13 @@ class BrokerConnection(object): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.last_attempt = time.time() - next_lookup = self._next_afi_host_port() + next_lookup = self._next_afi_sockaddr() if not next_lookup: self.close(Errors.ConnectionError('DNS failure')) return else: log.debug('%s: creating new socket', self) - self._sock_afi, self._sock_ip, self._sock_port = next_lookup + self._sock_afi, self._sock_addr = next_lookup self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: @@ -348,9 +346,8 @@ class BrokerConnection(object): # so we need to double check that we are still connecting before if self.connecting(): self.config['state_change_callback'](self) - log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host, - self.port, self._sock_ip, self._sock_port, - AFI_NAMES[self._sock_afi]) + log.info('%s: connecting to %s:%d [%s %s]', self, self.host, + self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -358,7 +355,7 @@ class BrokerConnection(object): request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: - ret = self._sock.connect_ex((self._sock_ip, self._sock_port)) + ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno @@ -1009,9 +1006,9 @@ class BrokerConnection(object): return version def __str__(self): - return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % ( + return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % ( self.node_id, self.host, self.port, self.state, - self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi]) + AFI_NAMES[self._sock_afi], self._sock_addr) class BrokerConnectionMetrics(object): |