diff options
-rw-r--r-- | kafka/conn.py | 23 | ||||
-rw-r--r-- | test/test_conn.py | 27 |
2 files changed, 22 insertions, 28 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 4bbd744..e88499c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -215,9 +215,8 @@ class BrokerConnection(object): self.host = host self.port = port self.afi = afi - self._sock_ip = host - self._sock_port = port self._sock_afi = afi + self._sock_addr = None self.in_flight_requests = collections.deque() self._api_versions = None @@ -279,13 +278,12 @@ class BrokerConnection(object): return False return True - def _next_afi_host_port(self): + def _next_afi_sockaddr(self): if not self._gai: if not self._dns_lookup(): return afi, _, __, ___, sockaddr = self._gai.pop(0) - host, port = sockaddr[:2] - return (afi, host, port) + return (afi, sockaddr) def connect_blocking(self, timeout=float('inf')): if self.connected(): @@ -327,13 +325,13 @@ class BrokerConnection(object): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): self.last_attempt = time.time() - next_lookup = self._next_afi_host_port() + next_lookup = self._next_afi_sockaddr() if not next_lookup: self.close(Errors.ConnectionError('DNS failure')) return else: log.debug('%s: creating new socket', self) - self._sock_afi, self._sock_ip, self._sock_port = next_lookup + self._sock_afi, self._sock_addr = next_lookup self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: @@ -348,9 +346,8 @@ class BrokerConnection(object): # so we need to double check that we are still connecting before if self.connecting(): self.config['state_change_callback'](self) - log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host, - self.port, self._sock_ip, self._sock_port, - AFI_NAMES[self._sock_afi]) + log.info('%s: connecting to %s:%d [%s %s]', self, self.host, + self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -358,7 +355,7 @@ class BrokerConnection(object): request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: - ret = self._sock.connect_ex((self._sock_ip, self._sock_port)) + ret = self._sock.connect_ex(self._sock_addr) except socket.error as err: ret = err.errno @@ -1009,9 +1006,9 @@ class BrokerConnection(object): return version def __str__(self): - return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % ( + return "<BrokerConnection node_id=%s host=%s:%d %s [%s %s]>" % ( self.node_id, self.host, self.port, self.state, - self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi]) + AFI_NAMES[self._sock_afi], self._sock_addr) class BrokerConnectionMetrics(object): diff --git a/test/test_conn.py b/test/test_conn.py index 44ee9ee..12a32ef 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -258,33 +258,31 @@ def test_lookup_on_connect(): assert conn.host == hostname assert conn.port == port assert conn.afi == socket.AF_UNSPEC - ip1 = '127.0.0.1' afi1 = socket.AF_INET + sockaddr1 = ('127.0.0.1', 9092) mock_return1 = [ - (afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)), + (afi1, socket.SOCK_STREAM, 6, '', sockaddr1), ] with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: conn.connect() m.assert_called_once_with(hostname, port, 0, 1) - conn.close() - assert conn._sock_ip == ip1 - assert conn._sock_port == 9092 assert conn._sock_afi == afi1 + assert conn._sock_addr == sockaddr1 + conn.close() - ip2 = '::1' afi2 = socket.AF_INET6 + sockaddr2 = ('::1', 9092, 0, 0) mock_return2 = [ - (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)), + (afi2, socket.SOCK_STREAM, 6, '', sockaddr2), ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: conn.last_attempt = 0 conn.connect() m.assert_called_once_with(hostname, port, 0, 1) - conn.close() - assert conn._sock_ip == ip2 - assert conn._sock_port == 9092 assert conn._sock_afi == afi2 + assert conn._sock_addr == sockaddr2 + conn.close() def test_relookup_on_failure(): @@ -300,17 +298,16 @@ def test_relookup_on_failure(): assert conn.disconnected() assert conn.last_attempt > last_attempt - ip2 = '127.0.0.2' afi2 = socket.AF_INET + sockaddr2 = ('127.0.0.2', 9092) mock_return2 = [ - (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)), + (afi2, socket.SOCK_STREAM, 6, '', sockaddr2), ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: conn.last_attempt = 0 conn.connect() m.assert_called_once_with(hostname, port, 0, 1) - conn.close() - assert conn._sock_ip == ip2 - assert conn._sock_port == 9092 assert conn._sock_afi == afi2 + assert conn._sock_addr == sockaddr2 + conn.close() |