diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 40 |
1 files changed, 25 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d778c31..798f85a 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -78,6 +78,14 @@ except ImportError: gssapi = None GSSError = None + +AFI_NAMES = { + socket.AF_UNSPEC: "unspecified", + socket.AF_INET: "IPv4", + socket.AF_INET6: "IPv6", +} + + class ConnectionStates(object): DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' @@ -204,13 +212,12 @@ class BrokerConnection(object): SASL_MECHANISMS = ('PLAIN', 'GSSAPI') def __init__(self, host, port, afi, **configs): - self.hostname = host self.host = host self.port = port self.afi = afi - self._init_host = host - self._init_port = port - self._init_afi = afi + self._sock_ip = host + self._sock_port = port + self._sock_afi = afi self.in_flight_requests = collections.deque() self._api_versions = None @@ -266,10 +273,10 @@ class BrokerConnection(object): def _next_afi_host_port(self): if not self._gai: - self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi) + self._gai = dns_lookup(self.host, self.port, self.afi) if not self._gai: log.error('DNS lookup failed for %s:%i (%s)', - self._init_host, self._init_port, self._init_afi) + self.host, self.port, self.afi) return afi, _, __, ___, sockaddr = self._gai.pop(0) @@ -286,8 +293,8 @@ class BrokerConnection(object): return else: log.debug('%s: creating new socket', self) - self.afi, self.host, self.port = next_lookup - self._sock = socket.socket(self.afi, socket.SOCK_STREAM) + self._sock_afi, self._sock_ip, self._sock_port = next_lookup + self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) for option in self.config['socket_options']: log.debug('%s: setting socket option %s', self, option) @@ -301,7 +308,9 @@ class BrokerConnection(object): # so we need to double check that we are still connecting before if self.connecting(): self.config['state_change_callback'](self) - log.info('%s: connecting to %s:%d', self, self.host, self.port) + log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host, + self.port, self._sock_ip, self._sock_port, + AFI_NAMES[self._sock_afi]) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -309,7 +318,7 @@ class BrokerConnection(object): request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: - ret = self._sock.connect_ex((self.host, self.port)) + ret = self._sock.connect_ex((self._sock_ip, self._sock_port)) except socket.error as err: ret = err.errno @@ -400,7 +409,7 @@ class BrokerConnection(object): try: self._sock = self._ssl_context.wrap_socket( self._sock, - server_hostname=self.hostname, + server_hostname=self.host, do_handshake_on_connect=False) except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', self) @@ -524,7 +533,7 @@ class BrokerConnection(object): return future.success(True) def _try_authenticate_gssapi(self, future): - auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname + auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host gssapi_name = gssapi.Name( auth_id, name_type=gssapi.NameType.hostbased_service @@ -962,9 +971,10 @@ class BrokerConnection(object): self.config[key] = stashed[key] return version - def __repr__(self): - return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % ( - self.node_id, self.hostname, self.host, self.port) + def __str__(self): + return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % ( + self.node_id, self.host, self.port, self.state, + self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi]) class BrokerConnectionMetrics(object): |