summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py40
1 files changed, 25 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d778c31..798f85a 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -78,6 +78,14 @@ except ImportError:
gssapi = None
GSSError = None
+
+AFI_NAMES = {
+ socket.AF_UNSPEC: "unspecified",
+ socket.AF_INET: "IPv4",
+ socket.AF_INET6: "IPv6",
+}
+
+
class ConnectionStates(object):
DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
@@ -204,13 +212,12 @@ class BrokerConnection(object):
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
def __init__(self, host, port, afi, **configs):
- self.hostname = host
self.host = host
self.port = port
self.afi = afi
- self._init_host = host
- self._init_port = port
- self._init_afi = afi
+ self._sock_ip = host
+ self._sock_port = port
+ self._sock_afi = afi
self.in_flight_requests = collections.deque()
self._api_versions = None
@@ -266,10 +273,10 @@ class BrokerConnection(object):
def _next_afi_host_port(self):
if not self._gai:
- self._gai = dns_lookup(self._init_host, self._init_port, self._init_afi)
+ self._gai = dns_lookup(self.host, self.port, self.afi)
if not self._gai:
log.error('DNS lookup failed for %s:%i (%s)',
- self._init_host, self._init_port, self._init_afi)
+ self.host, self.port, self.afi)
return
afi, _, __, ___, sockaddr = self._gai.pop(0)
@@ -286,8 +293,8 @@ class BrokerConnection(object):
return
else:
log.debug('%s: creating new socket', self)
- self.afi, self.host, self.port = next_lookup
- self._sock = socket.socket(self.afi, socket.SOCK_STREAM)
+ self._sock_afi, self._sock_ip, self._sock_port = next_lookup
+ self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
for option in self.config['socket_options']:
log.debug('%s: setting socket option %s', self, option)
@@ -301,7 +308,9 @@ class BrokerConnection(object):
# so we need to double check that we are still connecting before
if self.connecting():
self.config['state_change_callback'](self)
- log.info('%s: connecting to %s:%d', self, self.host, self.port)
+ log.info('%s: connecting to %s:%d [%s:%d %s]', self, self.host,
+ self.port, self._sock_ip, self._sock_port,
+ AFI_NAMES[self._sock_afi])
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -309,7 +318,7 @@ class BrokerConnection(object):
request_timeout = self.config['request_timeout_ms'] / 1000.0
ret = None
try:
- ret = self._sock.connect_ex((self.host, self.port))
+ ret = self._sock.connect_ex((self._sock_ip, self._sock_port))
except socket.error as err:
ret = err.errno
@@ -400,7 +409,7 @@ class BrokerConnection(object):
try:
self._sock = self._ssl_context.wrap_socket(
self._sock,
- server_hostname=self.hostname,
+ server_hostname=self.host,
do_handshake_on_connect=False)
except ssl.SSLError as e:
log.exception('%s: Failed to wrap socket in SSLContext!', self)
@@ -524,7 +533,7 @@ class BrokerConnection(object):
return future.success(True)
def _try_authenticate_gssapi(self, future):
- auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.hostname
+ auth_id = self.config['sasl_kerberos_service_name'] + '@' + self.host
gssapi_name = gssapi.Name(
auth_id,
name_type=gssapi.NameType.hostbased_service
@@ -962,9 +971,10 @@ class BrokerConnection(object):
self.config[key] = stashed[key]
return version
- def __repr__(self):
- return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
- self.node_id, self.hostname, self.host, self.port)
+ def __str__(self):
+ return "<BrokerConnection node_id=%s host=%s:%d %s [%s:%d %s]>" % (
+ self.node_id, self.host, self.port, self.state,
+ self._sock_ip, self._sock_port, AFI_NAMES[self._sock_afi])
class BrokerConnectionMetrics(object):