diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 18:45:32 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 19:53:52 -0800 |
commit | 42ea4f49132ded944e10cbafbd90a754def41836 (patch) | |
tree | be78a14731a358ee544793a3d2c2612420b7532f | |
parent | 1bcb9f029d7179a23d2e008891cfb9e7f0534d64 (diff) | |
download | kafka-python-42ea4f49132ded944e10cbafbd90a754def41836.tar.gz |
Catch py3 ConnectionErrors
-rw-r--r-- | kafka/client_async.py | 12 | ||||
-rw-r--r-- | kafka/conn.py | 25 |
2 files changed, 27 insertions, 10 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 8a92159..914afec 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -17,6 +17,10 @@ from .protocol.metadata import MetadataRequest from .protocol.produce import ProduceRequest from .version import __version__ +if six.PY2: + ConnectionError = None + + log = logging.getLogger(__name__) @@ -503,7 +507,6 @@ class KafkaClient(object): ('0.8.0', MetadataRequest([])), ] - for version, request in test_cases: connect() f = self.send(node_id, request) @@ -517,8 +520,11 @@ class KafkaClient(object): log.info('Broker version identifed as %s', version) return version - assert isinstance(f.exception.message, socket.error) - assert f.exception.message.errno in (32, 54) + if six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54) + else: + assert isinstance(f.exception.args[0], ConnectionError) log.info("Broker is not v%s -- it did not recognize %s", version, request.__class__.__name__) continue diff --git a/kafka/conn.py b/kafka/conn.py index d713b56..9e8a16f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,7 +13,6 @@ import time import six import kafka.common as Errors -from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse @@ -21,6 +20,10 @@ from kafka.protocol.types import Int32 from kafka.version import __version__ +if six.PY2: + ConnectionError = socket.error + BlockingIOError = Exception + log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 @@ -166,7 +169,7 @@ class BrokerConnection(object): sent_bytes = self._sock.send(message) assert sent_bytes == len(message) self._sock.setblocking(False) - except (AssertionError, socket.error) as e: + except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) error = Errors.ConnectionError(e) self.close(error=error) @@ -225,8 +228,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) - except socket.error as e: - if e.errno == errno.EWOULDBLOCK: + except ConnectionError as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: # This shouldn't happen after selecting above # but just in case return None @@ -234,6 +237,10 @@ class BrokerConnection(object): ' closing socket', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise if self._rbuffer.tell() == 4: self._rbuffer.seek(0) @@ -249,14 +256,18 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) - except socket.error as e: + except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet - if e.errno == errno.EWOULDBLOCK: + if six.PY2 and e.errno == errno.EWOULDBLOCK: return None log.exception('%s: Error in recv', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise staged_bytes = self._rbuffer.tell() if staged_bytes > self._next_payload_bytes: @@ -379,7 +390,7 @@ class KafkaConnection(local): self.close() # And then raise - raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): bytes_left = num_bytes |