summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 8f6e4a8..e2d7707 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -445,7 +445,9 @@ class BrokerConnection(object):
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
- raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ ex = self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ if not isinstance(ex, Errors.ConnectionError):
+ raise ex
return self._sasl_auth_future.succeeded()
def _handle_sasl_handshake_response(self, future, response):
@@ -488,11 +490,10 @@ class BrokerConnection(object):
error = Errors.AuthenticationFailedError(
'Authentication failed for user {0}'.format(
self.config['sasl_plain_username']))
- future.failure(error)
- raise error
+ return future.failure(error)
data += fragment
self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)
@@ -500,8 +501,7 @@ class BrokerConnection(object):
if data != b'\x00\x00\x00\x00':
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
- future.failure(error)
- raise error
+ return future.failure(error)
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)