diff options
author | Dana Powers <dana.powers@rd.io> | 2018-03-07 15:37:33 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2018-03-08 19:31:09 -0500 |
commit | fc77174650fe8231d5b9855824eea0bdfd6b56d6 (patch) | |
tree | 276a7cef95e35efc4b89d85edf91bd9edf0baba7 | |
parent | 4abdb1baea2468408c36cc983dfef1e8b8f54654 (diff) | |
download | kafka-python-conn_immutable_host_port_afi.tar.gz |
Make BrokerConnection .host / .port / .afi immutable, use _sock_* attributes for current lookupsconn_immutable_host_port_afi
-rw-r--r-- | kafka/conn.py | 40 | ||||
-rw-r--r-- | test/test_conn.py | 29 |
2 files changed, 45 insertions, 24 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d778c31..798f85a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -78,6 +78,14 @@ except ImportError: gssapi = None GSSError = None + +AFI_NAMES = { + socket.AF_UNSPEC: "unspecified", + socket.AF_INET: "IPv4", + socket.AF_INET6: "IPv6", +} + + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' @@ -204,13 +212,12 @@ class BrokerConnection(object): SASL_MECHANISMS = ('PLAIN', 'GSSAPI') def __init__(self, host, port, afi, **configs): - self.hostname = host self.host = host self.port = port self.afi = afi - self._init_host = host - self._init_port = port - self._init_afi = afi + self._sock_ip = host + self._sock_port = port + self._sock_afi = afi self.in_flight_requests = collections.deque() self._api_versions = None @@ -266,10 +273,10 @@ class BrokerConnection(object): def _next_afi_host_port(self): if not self._gai: - self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi) + self._gai = dns_lookup(self.host, self.port, self.afi) if not self._gai: log.error('DNS lookup failed for %s:%i (%s)', - self._init_host, self._init_port, self._init_afi) + self.host, self.port, self.afi) return afi, _, __, ___, sockaddr = self._gai.pop(0) @@ -286,8 +293,8 @@ class BrokerConnection(object): return else: log.debug('%s: creating new socket', self) - self.afi, self.host, self.port = next_lookup - self._sock = socket.socket(self.afi, socket.SOCK_STREAM) + self._sock_afi, self._sock_ip, self._sock_port = next_lookup + self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) @@ -301,7 +308,9 @@ class BrokerConnection(object): # so we need to double check that we are still connecting before if self.connecting(): self.config['state_change_callback'](self) - log.info('%s: connecting to %s:%d', self, self.host, self.port) + log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host, + self.port, self._sock_ip, self._sock_port, + AFI_NAMES[self._sock_afi]) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -309,7 +318,7 @@ class BrokerConnection(object): request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: - ret = self._sock.connect_ex((self.host, self.port)) + ret = self._sock.connect_ex((self._sock_ip, self._sock_port)) except socket.error as err: ret = err.errno @@ -400,7 +409,7 @@ class BrokerConnection(object): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.hostname, + server_hostname=self.host, do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) @@ -524,7 +533,7 @@ class BrokerConnection(object): return future.success(True) def _try_authenticate_gssapi(self, future): - auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname + auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host gssapi_name = gssapi.Name( auth_id, name_type=gssapi.NameType.hostbased_service @@ -962,9 +971,10 @@ class BrokerConnection(object): self.config[key] = stashed[key] return version - def __repr__(self): - return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % ( - self.node_id, self.hostname, self.host, self.port) + def __str__(self): + return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % ( + self.node_id, self.host, self.port, self.state, + self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi]) class BrokerConnectionMetrics(object): diff --git a/test/test_conn.py b/test/test_conn.py index f35cebe..44ee9ee 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -255,20 +255,26 @@ def test_lookup_on_connect(): hostname = 'example.org' port = 9092 conn = BrokerConnection(hostname, port, socket.AF_UNSPEC) - assert conn.host == conn.hostname == hostname + assert conn.host == hostname + assert conn.port == port + assert conn.afi == socket.AF_UNSPEC ip1 = '127.0.0.1' + afi1 = socket.AF_INET mock_return1 = [ - (2, 2, 17, '', (ip1, 9092)), + (afi1, socket.SOCK_STREAM, 6, '', (ip1, 9092)), ] with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: conn.connect() m.assert_called_once_with(hostname, port, 0, 1) conn.close() - assert conn.host == ip1 + assert conn._sock_ip == ip1 + assert conn._sock_port == 9092 + assert conn._sock_afi == afi1 - ip2 = '127.0.0.2' + ip2 = '::1' + afi2 = socket.AF_INET6 mock_return2 = [ - (2, 2, 17, '', (ip2, 9092)), + (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)), ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: @@ -276,14 +282,16 @@ def test_lookup_on_connect(): conn.connect() m.assert_called_once_with(hostname, port, 0, 1) conn.close() - assert conn.host == ip2 + assert conn._sock_ip == ip2 + assert conn._sock_port == 9092 + assert conn._sock_afi == afi2 def test_relookup_on_failure(): hostname = 'example.org' port = 9092 conn = BrokerConnection(hostname, port, socket.AF_UNSPEC) - assert conn.host == conn.hostname == hostname + assert conn.host == hostname mock_return1 = [] with mock.patch("socket.getaddrinfo", return_value=mock_return1) as m: last_attempt = conn.last_attempt @@ -293,8 +301,9 @@ def test_relookup_on_failure(): assert conn.last_attempt > last_attempt ip2 = '127.0.0.2' + afi2 = socket.AF_INET mock_return2 = [ - (2, 2, 17, '', (ip2, 9092)), + (afi2, socket.SOCK_STREAM, 6, '', (ip2, 9092)), ] with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: @@ -302,4 +311,6 @@ def test_relookup_on_failure(): conn.connect() m.assert_called_once_with(hostname, port, 0, 1) conn.close() - assert conn.host == ip2 + assert conn._sock_ip == ip2 + assert conn._sock_port == 9092 + assert conn._sock_afi == afi2 |