summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-08 12:09:13 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-10 11:04:00 -0700
commit6bd9dc8f183b7fac952ff446dd7332230ffa7215 (patch)
treea749f66694b63ed561f7b441149e9ef9dac333a9
parent5c17cf035019dca4b451b0db8f5e65c8e489a0f4 (diff)
downloadkafka-python-6bd9dc8f183b7fac952ff446dd7332230ffa7215.tar.gz
Check for disconnects during ssl handshake and sasl authentication
-rw-r--r--kafka/conn.py34
1 files changed, 19 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 467519e..8f6e4a8 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -299,12 +299,15 @@ class BrokerConnection(object):
self._sock.setsockopt(*option)
self._sock.setblocking(False)
+ self.last_attempt = time.time()
+ self.state = ConnectionStates.CONNECTING
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
self._wrap_ssl()
- log.info('%s: connecting to %s:%d', self, self.host, self.port)
- self.state = ConnectionStates.CONNECTING
- self.last_attempt = time.time()
- self.config['state_change_callback'](self)
+ # _wrap_ssl can alter the connection state -- disconnects on failure
+ # so we need to double check that we are still connecting before
+ if self.connecting():
+ self.config['state_change_callback'](self)
+ log.info('%s: connecting to %s:%d', self, self.host, self.port)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -367,10 +370,12 @@ class BrokerConnection(object):
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
if self._try_authenticate():
- log.debug('%s: Connection complete.', self)
- self.state = ConnectionStates.CONNECTED
- self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ # _try_authenticate has side-effects: possibly disconnected on socket errors
+ if self.state is ConnectionStates.AUTHENTICATING:
+ log.debug('%s: Connection complete.', self)
+ self.state = ConnectionStates.CONNECTED
+ self._reset_reconnect_backoff()
+ self.config['state_change_callback'](self)
return self.state
@@ -397,10 +402,7 @@ class BrokerConnection(object):
password=self.config['ssl_password'])
if self.config['ssl_crlfile']:
if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'):
- error = 'No CRL support with this version of Python.'
- log.error('%s: %s Disconnecting.', self, error)
- self.close(Errors.ConnectionError(error))
- return
+ raise RuntimeError('This version of Python does not support ssl_crlfile!')
log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile'])
self._ssl_context.load_verify_locations(self.config['ssl_crlfile'])
# pylint: disable=no-member
@@ -493,13 +495,15 @@ class BrokerConnection(object):
except (AssertionError, ConnectionError) as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
- future.failure(error)
self.close(error=error)
+ return future.failure(error)
if data != b'\x00\x00\x00\x00':
- return future.failure(Errors.AuthenticationFailedError())
+ error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
+ future.failure(error)
+ raise error
- log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username'])
+ log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)
def _try_authenticate_gssapi(self, future):