summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2018-05-10 16:12:19 -0700
committerJeff Widman <jeff@jeffwidman.com>2018-05-10 16:14:59 -0700
commitc9fba2041e138b49c85a566c2ff80cf5af91a4e8 (patch)
treefd03145aa7897175627b351715833a17093d1903 /kafka
parent27f939ad528a5f7f71346c3d9b18e1a9aa9404e5 (diff)
downloadkafka-python-Stop-shadowing-native-python-ConnectionError-exception.tar.gz
In Python3, `ConnectionError` is a native exception. So rename our custom one to `KafkaConnectionError` to prevent accidentally shadowing the native one. Note that there are still valid uses of `ConnectionError` in this code. They already expect a native Python3 `ConnectionError`, and also already handle the Python2 compatibility issues.
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client.py12
-rw-r--r--kafka/client_async.py2
-rw-r--r--kafka/conn.py28
-rw-r--r--kafka/errors.py6
-rw-r--r--kafka/producer/base.py1
5 files changed, 24 insertions, 25 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 10b1724..789d4da 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -11,7 +11,7 @@ import select
from kafka.vendor import six
import kafka.errors
-from kafka.errors import (UnknownError, ConnectionError, FailedPayloadsError,
+from kafka.errors import (UnknownError, KafkaConnectionError, FailedPayloadsError,
KafkaTimeoutError, KafkaUnavailableError,
LeaderNotAvailableError, UnknownTopicOrPartitionError,
NotLeaderForPartitionError, ReplicaNotAvailableError)
@@ -73,7 +73,7 @@ class SimpleClient(object):
conn = self._conns[host_key]
if not conn.connect_blocking(self.timeout):
conn.close()
- raise ConnectionError("%s:%s (%s)" % (host, port, afi))
+ raise KafkaConnectionError("%s:%s (%s)" % (host, port, afi))
return conn
def _get_leader_for_partition(self, topic, partition):
@@ -156,7 +156,7 @@ class SimpleClient(object):
for (host, port, afi) in hosts:
try:
conn = self._get_conn(host, port, afi)
- except ConnectionError:
+ except KafkaConnectionError:
log.warning("Skipping unconnected connection: %s:%s (AFI %s)",
host, port, afi)
continue
@@ -242,7 +242,7 @@ class SimpleClient(object):
host, port, afi = get_ip_port_afi(broker.host)
try:
conn = self._get_conn(host, broker.port, afi)
- except ConnectionError:
+ except KafkaConnectionError:
refresh_metadata = True
failed_payloads(broker_payloads)
continue
@@ -344,8 +344,8 @@ class SimpleClient(object):
try:
host, port, afi = get_ip_port_afi(broker.host)
conn = self._get_conn(host, broker.port, afi)
- except ConnectionError as e:
- log.warning('ConnectionError attempting to send request %s '
+ except KafkaConnectionError as e:
+ log.warning('KafkaConnectionError attempting to send request %s '
'to server %s: %s', request_id, broker, e)
for payload in payloads:
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 9556eca..a9704fa 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -602,7 +602,7 @@ class KafkaClient(object):
log.warning('Protocol out of sync on %r, closing', conn)
except socket.error:
pass
- conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
+ conn.close(Errors.KafkaConnectionError('Socket EVENT_READ without in-flight-requests'))
continue
self._idle_expiry_manager.update(conn.node_id)
diff --git a/kafka/conn.py b/kafka/conn.py
index daaa234..f67edfb 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -327,7 +327,7 @@ class BrokerConnection(object):
self.last_attempt = time.time()
next_lookup = self._next_afi_sockaddr()
if not next_lookup:
- self.close(Errors.ConnectionError('DNS failure'))
+ self.close(Errors.KafkaConnectionError('DNS failure'))
return
else:
log.debug('%s: creating new socket', self)
@@ -381,12 +381,12 @@ class BrokerConnection(object):
log.error('Connect attempt to %s returned error %s.'
' Disconnecting.', self, ret)
errstr = errno.errorcode.get(ret, 'UNKNOWN')
- self.close(Errors.ConnectionError('{} {}'.format(ret, errstr)))
+ self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr)))
# Connection timed out
elif time.time() > request_timeout + self.last_attempt:
log.error('Connection attempt to %s timed out', self)
- self.close(Errors.ConnectionError('timeout'))
+ self.close(Errors.KafkaConnectionError('timeout'))
# Needs retry
else:
@@ -463,7 +463,7 @@ class BrokerConnection(object):
pass
except (SSLZeroReturnError, ConnectionError, SSLEOFError):
log.warning('SSL connection closed by server during handshake.')
- self.close(Errors.ConnectionError('SSL connection closed by server during handshake'))
+ self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake'))
# Other SSLErrors will be raised to user
return False
@@ -488,7 +488,7 @@ class BrokerConnection(object):
return False
elif self._sasl_auth_future.failed():
ex = self._sasl_auth_future.exception
- if not isinstance(ex, Errors.ConnectionError):
+ if not isinstance(ex, Errors.KafkaConnectionError):
raise ex # pylint: disable-msg=raising-bad-type
return self._sasl_auth_future.succeeded()
@@ -558,8 +558,8 @@ class BrokerConnection(object):
data = self._recv_bytes_blocking(4)
except ConnectionError as e:
- log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ log.exception("%s: Error receiving reply from server", self)
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
@@ -621,7 +621,7 @@ class BrokerConnection(object):
except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
except Exception as e:
@@ -701,7 +701,7 @@ class BrokerConnection(object):
Arguments:
error (Exception, optional): pending in-flight-requests
will be failed with this exception.
- Default: kafka.errors.ConnectionError.
+ Default: kafka.errors.KafkaConnectionError.
"""
if self.state is ConnectionStates.DISCONNECTED:
if error is not None:
@@ -733,7 +733,7 @@ class BrokerConnection(object):
if self.connecting():
return future.failure(Errors.NodeNotReadyError(str(self)))
elif not self.connected():
- return future.failure(Errors.ConnectionError(str(self)))
+ return future.failure(Errors.KafkaConnectionError(str(self)))
elif not self.can_send_more():
return future.failure(Errors.TooManyInFlightRequests(str(self)))
return self._send(request)
@@ -753,7 +753,7 @@ class BrokerConnection(object):
self._sensors.bytes_sent.record(total_bytes)
except ConnectionError as e:
log.exception("Error sending %s to %s", request, self)
- error = Errors.ConnectionError("%s: %s" % (self, e))
+ error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)
log.debug('%s Request %d: %s', self, correlation_id, request)
@@ -781,7 +781,7 @@ class BrokerConnection(object):
# If requests are pending, we should close the socket and
# fail all the pending request futures
if self.in_flight_requests:
- self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests'))
+ self.close(Errors.KafkaConnectionError('Socket not connected during recv with in-flight-requests'))
return ()
elif not self.in_flight_requests:
@@ -821,7 +821,7 @@ class BrokerConnection(object):
# without an exception raised
if not data:
log.error('%s: socket disconnected', self)
- self.close(error=Errors.ConnectionError('socket disconnected'))
+ self.close(error=Errors.KafkaConnectionError('socket disconnected'))
return []
else:
recvd.append(data)
@@ -833,7 +833,7 @@ class BrokerConnection(object):
break
log.exception('%s: Error receiving network data'
' closing socket', self)
- self.close(error=Errors.ConnectionError(e))
+ self.close(error=Errors.KafkaConnectionError(e))
return []
except BlockingIOError:
if six.PY3:
diff --git a/kafka/errors.py b/kafka/errors.py
index c70853c..f4c8740 100644
--- a/kafka/errors.py
+++ b/kafka/errors.py
@@ -447,7 +447,7 @@ class FailedPayloadsError(KafkaError):
self.payload = payload
-class ConnectionError(KafkaError):
+class KafkaConnectionError(KafkaError):
retriable = True
invalid_metadata = True
@@ -517,13 +517,13 @@ def check_error(response):
RETRY_BACKOFF_ERROR_TYPES = (
KafkaUnavailableError, LeaderNotAvailableError,
- ConnectionError, FailedPayloadsError
+ KafkaConnectionError, FailedPayloadsError
)
RETRY_REFRESH_ERROR_TYPES = (
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
- LeaderNotAvailableError, ConnectionError
+ LeaderNotAvailableError, KafkaConnectionError
)
diff --git a/kafka/producer/base.py b/kafka/producer/base.py
index e8d6c3d..c9dd6c3 100644
--- a/kafka/producer/base.py
+++ b/kafka/producer/base.py
@@ -372,7 +372,6 @@ class Producer(object):
Raises:
FailedPayloadsError: low-level connection error, can be caused by
networking failures, or a malformed request.
- ConnectionError:
KafkaUnavailableError: all known brokers are down when attempting
to refresh metadata.
LeaderNotAvailableError: topic or partition is initializing or