diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index daaa234..f67edfb 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -327,7 +327,7 @@ class BrokerConnection(object): self.last_attempt = time.time() next_lookup = self._next_afi_sockaddr() if not next_lookup: - self.close(Errors.ConnectionError('DNS failure')) + self.close(Errors.KafkaConnectionError('DNS failure')) return else: log.debug('%s: creating new socket', self) @@ -381,12 +381,12 @@ class BrokerConnection(object): log.error('Connect attempt to %s returned error %s.' ' Disconnecting.', self, ret) errstr = errno.errorcode.get(ret, 'UNKNOWN') - self.close(Errors.ConnectionError('{} {}'.format(ret, errstr))) + self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) # Connection timed out elif time.time() > request_timeout + self.last_attempt: log.error('Connection attempt to %s timed out', self) - self.close(Errors.ConnectionError('timeout')) + self.close(Errors.KafkaConnectionError('timeout')) # Needs retry else: @@ -463,7 +463,7 @@ class BrokerConnection(object): pass except (SSLZeroReturnError, ConnectionError, SSLEOFError): log.warning('SSL connection closed by server during handshake.') - self.close(Errors.ConnectionError('SSL connection closed by server during handshake')) + self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user return False @@ -488,7 +488,7 @@ class BrokerConnection(object): return False elif self._sasl_auth_future.failed(): ex = self._sasl_auth_future.exception - if not isinstance(ex, Errors.ConnectionError): + if not isinstance(ex, Errors.KafkaConnectionError): raise ex # pylint: disable-msg=raising-bad-type return self._sasl_auth_future.succeeded() @@ -558,8 +558,8 @@ class BrokerConnection(object): data = self._recv_bytes_blocking(4) except ConnectionError as e: - log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + log.exception("%s: Error receiving reply from server", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) @@ -621,7 +621,7 @@ class BrokerConnection(object): except ConnectionError as e: log.exception("%s: Error receiving reply from server", self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) except Exception as e: @@ -701,7 +701,7 @@ class BrokerConnection(object): Arguments: error (Exception, optional): pending in-flight-requests will be failed with this exception. - Default: kafka.errors.ConnectionError. + Default: kafka.errors.KafkaConnectionError. """ if self.state is ConnectionStates.DISCONNECTED: if error is not None: @@ -733,7 +733,7 @@ class BrokerConnection(object): if self.connecting(): return future.failure(Errors.NodeNotReadyError(str(self))) elif not self.connected(): - return future.failure(Errors.ConnectionError(str(self))) + return future.failure(Errors.KafkaConnectionError(str(self))) elif not self.can_send_more(): return future.failure(Errors.TooManyInFlightRequests(str(self))) return self._send(request) @@ -753,7 +753,7 @@ class BrokerConnection(object): self._sensors.bytes_sent.record(total_bytes) except ConnectionError as e: log.exception("Error sending %s to %s", request, self) - error = Errors.ConnectionError("%s: %s" % (self, e)) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return future.failure(error) log.debug('%s Request %d: %s', self, correlation_id, request) @@ -781,7 +781,7 @@ class BrokerConnection(object): # If requests are pending, we should close the socket and # fail all the pending request futures if self.in_flight_requests: - self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests')) + self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests')) return () elif not self.in_flight_requests: @@ -821,7 +821,7 @@ class BrokerConnection(object): # without an exception raised if not data: log.error('%s: socket disconnected', self) - self.close(error=Errors.ConnectionError('socket disconnected')) + self.close(error=Errors.KafkaConnectionError('socket disconnected')) return [] else: recvd.append(data) @@ -833,7 +833,7 @@ class BrokerConnection(object): break log.exception('%s: Error receiving network data' ' closing socket', self) - self.close(error=Errors.ConnectionError(e)) + self.close(error=Errors.KafkaConnectionError(e)) return [] except BlockingIOError: if six.PY3: |