summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py34
1 files changed, 21 insertions, 13 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index a00206f..044d2d5 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -212,7 +212,7 @@ class BrokerConnection(object):
'ssl_ciphers': None,
'api_version': (0, 8, 2), # default to most restrictive
'selector': selectors.DefaultSelector,
- 'state_change_callback': lambda conn: True,
+ 'state_change_callback': lambda node_id, sock, conn: True,
'metrics': None,
'metric_group_prefix': '',
'sasl_mechanism': None,
@@ -357,6 +357,7 @@ class BrokerConnection(object):
return self.state
else:
log.debug('%s: creating new socket', self)
+ assert self._sock is None
self._sock_afi, self._sock_addr = next_lookup
self._sock = socket.socket(self._sock_afi, socket.SOCK_STREAM)
@@ -366,7 +367,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
log.info('%s: connecting to %s:%d [%s %s]', self, self.host,
self.port, self._sock_addr, AFI_NAMES[self._sock_afi])
@@ -386,21 +387,21 @@ class BrokerConnection(object):
if self.config['security_protocol'] in ('SSL', 'SASL_SSL'):
log.debug('%s: initiating SSL handshake', self)
self.state = ConnectionStates.HANDSHAKE
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
# _wrap_ssl can alter the connection state -- disconnects on failure
self._wrap_ssl()
elif self.config['security_protocol'] == 'SASL_PLAINTEXT':
log.debug('%s: initiating SASL authentication', self)
self.state = ConnectionStates.AUTHENTICATING
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
else:
# security_protocol PLAINTEXT
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -425,7 +426,7 @@ class BrokerConnection(object):
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
if self.state is ConnectionStates.AUTHENTICATING:
assert self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL')
@@ -435,7 +436,7 @@ class BrokerConnection(object):
log.info('%s: Connection complete.', self)
self.state = ConnectionStates.CONNECTED
self._reset_reconnect_backoff()
- self.config['state_change_callback'](self)
+ self.config['state_change_callback'](self.node_id, self._sock, self)
if self.state not in (ConnectionStates.CONNECTED,
ConnectionStates.DISCONNECTED):
@@ -802,15 +803,13 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.KafkaConnectionError.
"""
+ if self.state is ConnectionStates.DISCONNECTED:
+ return
with self._lock:
if self.state is ConnectionStates.DISCONNECTED:
return
log.info('%s: Closing connection. %s', self, error or '')
- self.state = ConnectionStates.DISCONNECTING
- self.config['state_change_callback'](self)
self._update_reconnect_backoff()
- self._close_socket()
- self.state = ConnectionStates.DISCONNECTED
self._sasl_auth_future = None
self._protocol = KafkaProtocol(
client_id=self.config['client_id'],
@@ -819,9 +818,18 @@ class BrokerConnection(object):
error = Errors.Cancelled(str(self))
ifrs = list(self.in_flight_requests.items())
self.in_flight_requests.clear()
- self.config['state_change_callback'](self)
+ self.state = ConnectionStates.DISCONNECTED
+ # To avoid race conditions and/or deadlocks
+ # keep a reference to the socket but leave it
+ # open until after the state_change_callback
+ # This should give clients a change to deregister
+ # the socket fd from selectors cleanly.
+ sock = self._sock
+ self._sock = None
- # drop lock before processing futures
+ # drop lock before state change callback and processing futures
+ self.config['state_change_callback'](self.node_id, sock, self)
+ sock.close()
for (_correlation_id, (future, _timestamp)) in ifrs:
future.failure(error)