summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py25
1 files changed, 18 insertions, 7 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index d713b56..9e8a16f 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -13,7 +13,6 @@ import time
import six
import kafka.common as Errors
-from kafka.common import ConnectionError
from kafka.future import Future
from kafka.protocol.api import RequestHeader
from kafka.protocol.commit import GroupCoordinatorResponse
@@ -21,6 +20,10 @@ from kafka.protocol.types import Int32
from kafka.version import __version__
+if six.PY2:
+ ConnectionError = socket.error
+ BlockingIOError = Exception
+
log = logging.getLogger(__name__)
DEFAULT_SOCKET_TIMEOUT_SECONDS = 120
@@ -166,7 +169,7 @@ class BrokerConnection(object):
sent_bytes = self._sock.send(message)
assert sent_bytes == len(message)
self._sock.setblocking(False)
- except (AssertionError, socket.error) as e:
+ except (AssertionError, ConnectionError) as e:
log.exception("Error sending %s to %s", request, self)
error = Errors.ConnectionError(e)
self.close(error=error)
@@ -225,8 +228,8 @@ class BrokerConnection(object):
# An extremely small, but non-zero, probability that there are
# more than 0 but not yet 4 bytes available to read
self._rbuffer.write(self._sock.recv(4 - self._rbuffer.tell()))
- except socket.error as e:
- if e.errno == errno.EWOULDBLOCK:
+ except ConnectionError as e:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
# This shouldn't happen after selecting above
# but just in case
return None
@@ -234,6 +237,10 @@ class BrokerConnection(object):
' closing socket', self)
self.close(error=Errors.ConnectionError(e))
return None
+ except BlockingIOError:
+ if six.PY3:
+ return None
+ raise
if self._rbuffer.tell() == 4:
self._rbuffer.seek(0)
@@ -249,14 +256,18 @@ class BrokerConnection(object):
staged_bytes = self._rbuffer.tell()
try:
self._rbuffer.write(self._sock.recv(self._next_payload_bytes - staged_bytes))
- except socket.error as e:
+ except ConnectionError as e:
# Extremely small chance that we have exactly 4 bytes for a
# header, but nothing to read in the body yet
- if e.errno == errno.EWOULDBLOCK:
+ if six.PY2 and e.errno == errno.EWOULDBLOCK:
return None
log.exception('%s: Error in recv', self)
self.close(error=Errors.ConnectionError(e))
return None
+ except BlockingIOError:
+ if six.PY3:
+ return None
+ raise
staged_bytes = self._rbuffer.tell()
if staged_bytes > self._next_payload_bytes:
@@ -379,7 +390,7 @@ class KafkaConnection(local):
self.close()
# And then raise
- raise ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
+ raise Errors.ConnectionError("Kafka @ {0}:{1} went away".format(self.host, self.port))
def _read_bytes(self, num_bytes):
bytes_left = num_bytes