diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 25 |
1 files changed, 18 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index d713b56..9e8a16f 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -13,7 +13,6 @@ import time import six import kafka.common as Errors -from kafka.common import ConnectionError from kafka.future import Future from kafka.protocol.api import RequestHeader from kafka.protocol.commit import GroupCoordinatorResponse @@ -21,6 +20,10 @@ from kafka.protocol.types import Int32 from kafka.version import __version__ +if six.PY2: + ConnectionError = socket.error + BlockingIOError = Exception + log = logging.getLogger(__name__) DEFAULT_SOCKET_TIMEOUT_SECONDS = 120 @@ -166,7 +169,7 @@ class BrokerConnection(object): sent_bytes = self._sock.send(message) assert sent_bytes == len(message) self._sock.setblocking(False) - except (AssertionError, socket.error) as e: + except (AssertionError, ConnectionError) as e: log.exception("Error sending %s to %s", request, self) error = Errors.ConnectionError(e) self.close(error=error) @@ -225,8 +228,8 @@ class BrokerConnection(object): # An extremely small, but non-zero, probability that there are # more than 0 but not yet 4 bytes available to read self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell())) - except socket.error as e: - if e.errno == errno.EWOULDBLOCK: + except ConnectionError as e: + if six.PY2 and e.errno == errno.EWOULDBLOCK: # This shouldn't happen after selecting above # but just in case return None @@ -234,6 +237,10 @@ class BrokerConnection(object): ' closing socket', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise if self._rbuffer.tell() == 4: self._rbuffer.seek(0) @@ -249,14 +256,18 @@ class BrokerConnection(object): staged_bytes = self._rbuffer.tell() try: self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes)) - except socket.error as e: + except ConnectionError as e: # Extremely small chance that we have exactly 4 bytes for a # header, but nothing to read in the body yet - if e.errno == errno.EWOULDBLOCK: + if six.PY2 and e.errno == errno.EWOULDBLOCK: return None log.exception('%s: Error in recv', self) self.close(error=Errors.ConnectionError(e)) return None + except BlockingIOError: + if six.PY3: + return None + raise staged_bytes = self._rbuffer.tell() if staged_bytes > self._next_payload_bytes: @@ -379,7 +390,7 @@ class KafkaConnection(local): self.close() # And then raise - raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) + raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port)) def _read_bytes(self, num_bytes): bytes_left = num_bytes |