diff options
-rw-r--r-- | kafka/conn.py | 8 |
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..45de239 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' CONNECTED = '<connected>' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() |