diff options
-rw-r--r-- | kafka/conn.py | 34 |
1 files changed, 19 insertions, 15 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 467519e..8f6e4a8 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -299,12 +299,15 @@ class BrokerConnection(object): self._sock.setsockopt(*option) self._sock.setblocking(False) + self.last_attempt = time.time() + self.state = ConnectionStates.CONNECTING if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): self._wrap_ssl() - log.info('%s: connecting to %s:%d', self, self.host, self.port) - self.state = ConnectionStates.CONNECTING - self.last_attempt = time.time() - self.config['state_change_callback'](self) + # _wrap_ssl can alter the connection state -- disconnects on failure + # so we need to double check that we are still connecting before + if self.connecting(): + self.config['state_change_callback'](self) + log.info('%s: connecting to %s:%d', self, self.host, self.port) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -367,10 +370,12 @@ class BrokerConnection(object): if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') if self._try_authenticate(): - log.debug('%s: Connection complete.', self) - self.state = ConnectionStates.CONNECTED - self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + # _try_authenticate has side-effects: possibly disconnected on socket errors + if self.state is ConnectionStates.AUTHENTICATING: + log.debug('%s: Connection complete.', self) + self.state = ConnectionStates.CONNECTED + self._reset_reconnect_backoff() + self.config['state_change_callback'](self) return self.state @@ -397,10 +402,7 @@ class BrokerConnection(object): password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - error = 'No CRL support with this version of Python.' - log.error('%s: %s Disconnecting.', self, error) - self.close(Errors.ConnectionError(error)) - return + raise RuntimeError('This version of Python does not support ssl_crlfile!') log.info('%s: Loading SSL CRL from %s', self, self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) # pylint: disable=no-member @@ -493,13 +495,15 @@ class BrokerConnection(object): except (AssertionError, ConnectionError) as e: log.exception("%s: Error receiving reply from server", self) error = Errors.ConnectionError("%s: %s" % (self, e)) - future.failure(error) self.close(error=error) + return future.failure(error) if data != b'\x00\x00\x00\x00': - return future.failure(Errors.AuthenticationFailedError()) + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + future.failure(error) + raise error - log.info('%s: Authenticated as %s', self, self.config['sasl_plain_username']) + log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username']) return future.success(True) def _try_authenticate_gssapi(self, future): |