diff options
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 46 |
2 files changed, 28 insertions, 20 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index bb96578..6179eba 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -557,7 +557,7 @@ class KafkaClient(object): log.warning('Protocol out of sync on %r, closing', conn) except socket.error: pass - conn.close() + conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests')) continue # Accumulate as many responses as the connection has pending diff --git a/kafka/conn.py b/kafka/conn.py index cbecfa7..50dc4d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -9,6 +9,7 @@ from random import shuffle import socket import ssl import time +import traceback from kafka.vendor import six @@ -236,10 +237,10 @@ class BrokerConnection(object): self._gai_index += 1 while True: if self._gai_index >= len(self._gai): - log.error('Unable to connect to any of the names for {0}:{1}'.format( - self._init_host, self._init_port - )) - self.close() + error = 'Unable to connect to any of the names for {0}:{1}'.format( + self._init_host, self._init_port) + log.error(error) + self.close(Errors.ConnectionError(error)) return afi, _, __, ___, sockaddr = self._gai[self._gai_index] if afi not in (socket.AF_INET, socket.AF_INET6): @@ -293,12 +294,12 @@ class BrokerConnection(object): elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022): log.error('Connect attempt to %s returned error %s.' ' Disconnecting.', self, ret) - self.close() + self.close(Errors.ConnectionError(ret)) # Connection timed out elif time.time() > request_timeout + self.last_attempt: log.error('Connection attempt to %s timed out', self) - self.close() # error=TimeoutError ? + self.close(Errors.ConnectionError('timeout')) # Needs retry else: @@ -345,9 +346,9 @@ class BrokerConnection(object): password=self.config['ssl_password']) if self.config['ssl_crlfile']: if not hasattr(ssl, 'VERIFY_CRL_CHECK_LEAF'): - log.error('%s: No CRL support with this version of Python.' - ' Disconnecting.', self) - self.close() + error = 'No CRL support with this version of Python.' + log.error('%s: %s Disconnecting.', self, error) + self.close(Errors.ConnectionError(error)) return log.info('%s: Loading SSL CRL from %s', str(self), self.config['ssl_crlfile']) self._ssl_context.load_verify_locations(self.config['ssl_crlfile']) @@ -359,9 +360,9 @@ class BrokerConnection(object): self._sock, server_hostname=self.hostname, do_handshake_on_connect=False) - except ssl.SSLError: + except ssl.SSLError as e: log.exception('%s: Failed to wrap socket in SSLContext!', str(self)) - self.close() + self.close(e) self.last_failure = time.time() def _try_handshake(self): @@ -374,7 +375,7 @@ class BrokerConnection(object): pass except ssl.SSLZeroReturnError: log.warning('SSL connection closed by server during handshake.') - self.close() + self.close(Errors.ConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user return False @@ -482,9 +483,15 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ - if self.state is not ConnectionStates.DISCONNECTED: - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) + if self.state is ConnectionStates.DISCONNECTED: + if error is not None: + log.warning('%s: close() called on disconnected connection with error: %s', self, error) + traceback.print_stack() + return + + log.info('%s: Closing connection. %s', self, error or '') + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -572,7 +579,7 @@ class BrokerConnection(object): # If requests are pending, we should close the socket and # fail all the pending request futures if self.in_flight_requests: - self.close() + self.close(Errors.ConnectionError('Socket not connected during recv with in-flight-requests')) return None elif not self.in_flight_requests: @@ -699,7 +706,7 @@ class BrokerConnection(object): '%s: Correlation IDs do not match: sent %d, recv %d' % (str(self), ifr.correlation_id, recv_correlation_id)) ifr.future.failure(error) - self.close() + self.close(error) self._processing = False return None @@ -713,8 +720,9 @@ class BrokerConnection(object): ' Unable to decode %d-byte buffer: %r', self, ifr.correlation_id, ifr.response_type, ifr.request, len(buf), buf) - ifr.future.failure(Errors.UnknownError('Unable to decode response')) - self.close() + error = Errors.UnknownError('Unable to decode response') + ifr.future.failure(error) + self.close(error) self._processing = False return None |