diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-07 21:54:49 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-07 23:54:52 -0700 |
commit | eff668e9adf9d7949f5a2b7ea7115a29a0d6095d (patch) | |
tree | ec73c8d596c24b9cd97adc6841fac4a70227a5f5 | |
parent | 1f5ed3816ca63d7112a942c8dd44d27950730d4e (diff) | |
download | kafka-python-refactor.tar.gz |
Add state_change_callback to BrokerConnectionrefactor
-rw-r--r-- | kafka/conn.py | 8 |
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..45de239 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' CONNECTED = '<connected>' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() |