diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-03 18:45:32 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-03 19:53:52 -0800 |
commit | 42ea4f49132ded944e10cbafbd90a754def41836 (patch) | |
tree | be78a14731a358ee544793a3d2c2612420b7532f /kafka/conn.py | |
parent | 1bcb9f029d7179a23d2e008891cfb9e7f0534d64 (diff) | |
download | kafka-python-42ea4f49132ded944e10cbafbd90a754def41836.tar.gz |
Catch py3 ConnectionErrors
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 25 |
1 files changed, 18 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d713b56..9e8a16f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,7 +13,6 @@ import time import six import kafka.common as Errors -from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse @@ -21,6 +20,10 @@ from kafka.protocol.types import Int32 from kafka.version import __version__ +if six.PY2: + ConnectionError = socket.error + BlockingIOError = Exception + log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 @@ -166,7 +169,7 @@ class BrokerConnection(object): sent_bytes = self._sock.send(message) assert sent_bytes == len(message) self._sock.setblocking(False) - except (AssertionError, socket.error) as e: + except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) error = Errors.ConnectionError(e) self.close(error=error) @@ -225,8 +228,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) - except socket.error as e: - if e.errno == errno.EWOULDBLOCK: + except ConnectionError as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: # This shouldn't happen after selecting above # but just in case return None @@ -234,6 +237,10 @@ class BrokerConnection(object): ' closing socket', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise if self._rbuffer.tell() == 4: self._rbuffer.seek(0) @@ -249,14 +256,18 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) - except socket.error as e: + except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet - if e.errno == errno.EWOULDBLOCK: + if six.PY2 and e.errno == errno.EWOULDBLOCK: return None log.exception('%s: Error in recv', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise staged_bytes = self._rbuffer.tell() if staged_bytes > self._next_payload_bytes: @@ -379,7 +390,7 @@ class KafkaConnection(local): self.close() # And then raise - raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): bytes_left = num_bytes |