summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-03 18:45:32 -0800
committerDana Powers <dana.powers@rd.io>2016-01-03 19:53:52 -0800
commit42ea4f49132ded944e10cbafbd90a754def41836 (patch)
treebe78a14731a358ee544793a3d2c2612420b7532f
parent1bcb9f029d7179a23d2e008891cfb9e7f0534d64 (diff)
downloadkafka-python-42ea4f49132ded944e10cbafbd90a754def41836.tar.gz
Catch py3 ConnectionErrors
-rw-r--r--kafka/client_async.py12
-rw-r--r--kafka/conn.py25
2 files changed, 27 insertions, 10 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 8a92159..914afec 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -17,6 +17,10 @@ from .protocol.metadata import MetadataRequest
from .protocol.produce import ProduceRequest
from .version import __version__
+if six.PY2:
+ ConnectionError = None
+
+
log = logging.getLogger(__name__)
@@ -503,7 +507,6 @@ class KafkaClient(object):
('0.8.0', MetadataRequest([])),
]
-
for version, request in test_cases:
connect()
f = self.send(node_id, request)
@@ -517,8 +520,11 @@ class KafkaClient(object):
log.info('Broker version identifed as %s', version)
return version
- assert isinstance(f.exception.message, socket.error)
- assert f.exception.message.errno in (32, 54)
+ if six.PY2:
+ assert isinstance(f.exception.args[0], socket.error)
+ assert f.exception.args[0].errno in (32, 54)
+ else:
+ assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
continue
diff --git a/kafka/conn.py b/kafka/conn.py
index d713b56..9e8a16f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -13,7 +13,6 @@ import time
import six
import kafka.common as Errors
-from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -21,6 +20,10 @@ from kafka.protocol.types import Int32
from kafka.version import __version__
+if six.PY2:
+ ConnectionError = socket.error
+ BlockingIOError = Exception
+
log = logging.getLogger(__name__)
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
@@ -166,7 +169,7 @@ class BrokerConnection(object):
sent_bytes = self._sock.send(message)
assert sent_bytes == len(message)
self._sock.setblocking(False)
- except (AssertionError, socket.error) as e:
+ except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError(e)
self.close(error=error)
@@ -225,8 +228,8 @@ class BrokerConnection(object):
# An extremely small, but non-zero, probability that there are
# more than 0 but not yet 4 bytes available to read
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
- except socket.error as e:
- if e.errno == errno.EWOULDBLOCK:
+ except ConnectionError as e:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
# This shouldn't happen after selecting above
# but just in case
return None
@@ -234,6 +237,10 @@ class BrokerConnection(object):
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
return None
+ except BlockingIOError:
+ if six.PY3:
+ return None
+ raise
if self._rbuffer.tell() == 4:
self._rbuffer.seek(0)
@@ -249,14 +256,18 @@ class BrokerConnection(object):
staged_bytes = self._rbuffer.tell()
try:
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
- except socket.error as e:
+ except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a
# header, but nothing to read in the body yet
- if e.errno == errno.EWOULDBLOCK:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
return None
log.exception('%s: Error in recv', self)
self.close(error=Errors.ConnectionError(e))
return None
+ except BlockingIOError:
+ if six.PY3:
+ return None
+ raise
staged_bytes = self._rbuffer.tell()
if staged_bytes > self._next_payload_bytes:
@@ -379,7 +390,7 @@ class KafkaConnection(local):
self.close()
# And then raise
- raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
+ raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
def _read_bytes(self, num_bytes):
bytes_left = num_bytes