summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py28
1 files changed, 14 insertions, 14 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index daaa234..f67edfb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -327,7 +327,7 @@ class BrokerConnection(object):
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
- self.close(Errors.ConnectionError('DNS failure'))
+ self.close(Errors.KafkaConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
@@ -381,12 +381,12 @@ class BrokerConnection(object):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
- self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
+ self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
- self.close(Errors.ConnectionError('timeout'))
+ self.close(Errors.KafkaConnectionError('timeout'))
# Needs retry
else:
@@ -463,7 +463,7 @@ class BrokerConnection(object):
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
- self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
+ self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
return False
@@ -488,7 +488,7 @@ class BrokerConnection(object):
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
- if not isinstance(ex, Errors.ConnectionError):
+ if not isinstance(ex, Errors.KafkaConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()
@@ -558,8 +558,8 @@ class BrokerConnection(object):
data = self._recv_bytes_blocking(4)
except ConnectionError as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
@@ -621,7 +621,7 @@ class BrokerConnection(object):
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
@@ -701,7 +701,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
- Default: kafka.errors.ConnectionError.
+ Default: kafka.errors.KafkaConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
@@ -733,7 +733,7 @@ class BrokerConnection(object):
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
- return future.failure(Errors.ConnectionError(str(self)))
+ return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
@@ -753,7 +753,7 @@ class BrokerConnection(object):
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -781,7 +781,7 @@ class BrokerConnection(object):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
- self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
+ self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
return ()
elif not self.in_flight_requests:
@@ -821,7 +821,7 @@ class BrokerConnection(object):
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
- self.close(error=Errors.ConnectionError('socket disconnected'))
+ self.close(error=Errors.KafkaConnectionError('socket disconnected'))
return []
else:
recvd.append(data)
@@ -833,7 +833,7 @@ class BrokerConnection(object):
break
log.exception('%s: Error receiving network data'
' closing socket', self)
- self.close(error=Errors.ConnectionError(e))
+ self.close(error=Errors.KafkaConnectionError(e))
return []
except BlockingIOError:
if six.PY3: