diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-01-12 22:00:38 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-01-12 22:00:38 -0800 |
commit | cbcbfe48be6f9f95981cfeb50dbac6dd4e2ef8d7 (patch) | |
tree | d6c25d15b7caef2a3489600c4d1b405d93959d19 | |
parent | 1a31be52ec012dfa0ef5079ff9982e01408a8fe1 (diff) | |
download | kafka-python-timeout_ssl_conn.tar.gz |
Timeout all unconnected conns (incl SSL) after request_timeout_mstimeout_ssl_conn
-rw-r--r-- | kafka/conn.py | 14 |
1 files changed, 8 insertions, 6 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 4d56964..7dfc8bd 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -351,7 +351,6 @@ class BrokerConnection(object): if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex # to check connection status - request_timeout = self.config['request_timeout_ms'] / 1000.0 ret = None try: ret = self._sock.connect_ex(self._sock_addr) @@ -389,11 +388,6 @@ class BrokerConnection(object): errstr = errno.errorcode.get(ret, 'UNKNOWN') self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) - # Connection timed out - elif time.time() > request_timeout + self.last_attempt: - log.error('Connection attempt to %s timed out', self) - self.close(Errors.KafkaConnectionError('timeout')) - # Needs retry else: pass @@ -419,6 +413,14 @@ class BrokerConnection(object): self._reset_reconnect_backoff() self.config['state_change_callback'](self) + if self.state not in (ConnectionStates.CONNECTED, + ConnectionStates.DISCONNECTED): + # Connection timed out + request_timeout = self.config['request_timeout_ms'] / 1000.0 + if time.time() > request_timeout + self.last_attempt: + log.error('Connection attempt to %s timed out', self) + self.close(Errors.KafkaConnectionError('timeout')) + return self.state def _wrap_ssl(self): |