summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-04-07 21:54:49 -0700
committerDana Powers <dana.powers@gmail.com>2016-04-07 23:54:52 -0700
commiteff668e9adf9d7949f5a2b7ea7115a29a0d6095d (patch)
treeec73c8d596c24b9cd97adc6841fac4a70227a5f5 /kafka/conn.py
parent1f5ed3816ca63d7112a942c8dd44d27950730d4e (diff)
downloadkafka-python-refactor.tar.gz
Add state_change_callback to BrokerConnectionrefactor
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py8
1 files changed, 8 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 92b2fd3..45de239 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092
class ConnectionStates(object):
+ DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
CONNECTED = '<connected>'
@@ -49,6 +50,7 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'api_version': (0, 8, 2), # default to most restrictive
+ 'state_change_callback': lambda conn: True,
}
def __init__(self, host, port, afi, **configs):
@@ -87,6 +89,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
+ self.config['state_change_callback'](self)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -101,6 +104,7 @@ class BrokerConnection(object):
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -151,6 +155,9 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
+ if self.state is not ConnectionStates.DISCONNECTED:
+ self.state = ConnectionStates.DISCONNECTING
+ self.config['state_change_callback'](self)
if self._sock:
self._sock.close()
self._sock = None
@@ -165,6 +172,7 @@ class BrokerConnection(object):
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
+ self.config['state_change_callback'](self)
def send(self, request, expect_response=True):
"""send request, return Future()