diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 34 |
1 files changed, 21 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index a00206f..044d2d5 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -212,7 +212,7 @@ class BrokerConnection(object): 'ssl_ciphers': None, 'api_version': (0, 8, 2), # default to most restrictive 'selector': selectors.DefaultSelector, - 'state_change_callback': lambda conn: True, + 'state_change_callback': lambda node_id, sock, conn: True, 'metrics': None, 'metric_group_prefix': '', 'sasl_mechanism': None, @@ -357,6 +357,7 @@ class BrokerConnection(object): return self.state else: log.debug('%s: creating new socket', self) + assert self._sock is None self._sock_afi, self._sock_addr = next_lookup self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM) @@ -366,7 +367,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) log.info('%s: connecting to %s:%d [%s %s]', self, self.host, self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) @@ -386,21 +387,21 @@ class BrokerConnection(object): if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): log.debug('%s: initiating SSL handshake', self) self.state = ConnectionStates.HANDSHAKE - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() elif self.config['security_protocol'] == 'SASL_PLAINTEXT': log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) else: # security_protocol PLAINTEXT log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -425,7 +426,7 @@ class BrokerConnection(object): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state is ConnectionStates.AUTHENTICATING: assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL') @@ -435,7 +436,7 @@ class BrokerConnection(object): log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED self._reset_reconnect_backoff() - self.config['state_change_callback'](self) + self.config['state_change_callback'](self.node_id, self._sock, self) if self.state not in (ConnectionStates.CONNECTED, ConnectionStates.DISCONNECTED): @@ -802,15 +803,13 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.KafkaConnectionError. """ + if self.state is ConnectionStates.DISCONNECTED: + return with self._lock: if self.state is ConnectionStates.DISCONNECTED: return log.info('%s: Closing connection. %s', self, error or '') - self.state = ConnectionStates.DISCONNECTING - self.config['state_change_callback'](self) self._update_reconnect_backoff() - self._close_socket() - self.state = ConnectionStates.DISCONNECTED self._sasl_auth_future = None self._protocol = KafkaProtocol( client_id=self.config['client_id'], @@ -819,9 +818,18 @@ class BrokerConnection(object): error = Errors.Cancelled(str(self)) ifrs = list(self.in_flight_requests.items()) self.in_flight_requests.clear() - self.config['state_change_callback'](self) + self.state = ConnectionStates.DISCONNECTED + # To avoid race conditions and/or deadlocks + # keep a reference to the socket but leave it + # open until after the state_change_callback + # This should give clients a change to deregister + # the socket fd from selectors cleanly. + sock = self._sock + self._sock = None - # drop lock before processing futures + # drop lock before state change callback and processing futures + self.config['state_change_callback'](self.node_id, sock, self) + sock.close() for (_correlation_id, (future, _timestamp)) in ifrs: future.failure(error) |