diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-05-28 21:00:05 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-05-28 21:00:05 -0700 |
commit | 2e5038fbdd682398d284fa7b803ad114b639eb01 (patch) | |
tree | a1823f8b49a89ed72e3185658f4128419a9c7901 | |
parent | f6a8a38937688ea2cc5dc13d3d1039493be5c9b5 (diff) | |
download | kafka-python-except_timeout.tar.gz |
Catch TimeoutError in BrokerConnection send/recvexcept_timeout
-rw-r--r-- | kafka/conn.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 044d2d5..825406c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -36,6 +36,7 @@ from kafka.version import __version__ if six.PY2: ConnectionError = socket.error + TimeoutError = socket.error BlockingIOError = Exception log = logging.getLogger(__name__) @@ -498,7 +499,7 @@ class BrokerConnection(object): # old ssl in python2.6 will swallow all SSLErrors here... except (SSLWantReadError, SSLWantWriteError): pass - except (SSLZeroReturnError, ConnectionError, SSLEOFError): + except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): log.warning('SSL connection closed by server during handshake.') self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user @@ -599,7 +600,7 @@ class BrokerConnection(object): # The connection is closed on failure data = self._recv_bytes_blocking(4) - except ConnectionError as e: + except (ConnectionError, TimeoutError) as e: log.exception("%s: Error receiving reply from server", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) @@ -665,7 +666,7 @@ class BrokerConnection(object): size = Int32.encode(len(msg)) self._send_bytes_blocking(size + msg) - except ConnectionError as e: + except (ConnectionError, TimeoutError) as e: self._lock.release() log.exception("%s: Error receiving reply from server", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) @@ -695,7 +696,7 @@ class BrokerConnection(object): # The connection is closed on failure data = self._recv_bytes_blocking(4) - except ConnectionError as e: + except (ConnectionError, TimeoutError) as e: self._lock.release() log.exception("%s: Error receiving reply from server", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) @@ -886,7 +887,7 @@ class BrokerConnection(object): if self._sensors: self._sensors.bytes_sent.record(total_bytes) return total_bytes - except ConnectionError as e: + except (ConnectionError, TimeoutError) as e: log.exception("Error sending request data to %s", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) @@ -954,7 +955,7 @@ class BrokerConnection(object): except SSLWantReadError: break - except ConnectionError as e: + except (ConnectionError, TimeoutError) as e: if six.PY2 and e.errno == errno.EWOULDBLOCK: break log.exception('%s: Error receiving network data' |