summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2017-10-08 12:26:07 -0700
committerDana Powers <dana.powers@gmail.com>2017-10-10 11:04:00 -0700
commit45063c3c4680226cdec73db9df999ba9f00d44fc (patch)
tree061e98d25998fabd33d2bbd5cadb8b8274203e35
parent6bd9dc8f183b7fac952ff446dd7332230ffa7215 (diff)
downloadkafka-python-45063c3c4680226cdec73db9df999ba9f00d44fc.tar.gz
Dont raise ConnectionError
-rw-r--r--kafka/conn.py12
1 files changed, 6 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 8f6e4a8..e2d7707 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -445,7 +445,9 @@ class BrokerConnection(object):
self._sasl_auth_future = future
self._recv()
if self._sasl_auth_future.failed():
- raise self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ ex = self._sasl_auth_future.exception # pylint: disable-msg=raising-bad-type
+ if not isinstance(ex, Errors.ConnectionError):
+ raise ex
return self._sasl_auth_future.succeeded()
def _handle_sasl_handshake_response(self, future, response):
@@ -488,11 +490,10 @@ class BrokerConnection(object):
error = Errors.AuthenticationFailedError(
'Authentication failed for user {0}'.format(
self.config['sasl_plain_username']))
- future.failure(error)
- raise error
+ return future.failure(error)
data += fragment
self._sock.setblocking(False)
- except (AssertionError, ConnectionError) as e:
+ except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.ConnectionError("%s: %s" % (self, e))
self.close(error=error)
@@ -500,8 +501,7 @@ class BrokerConnection(object):
if data != b'\x00\x00\x00\x00':
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
- future.failure(error)
- raise error
+ return future.failure(error)
log.info('%s: Authenticated as %s via PLAIN', self, self.config['sasl_plain_username'])
return future.success(True)