summaryrefslogtreecommitdiff
path: root/kafka/conn.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/conn.py')
-rw-r--r--kafka/conn.py104
1 files changed, 104 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py
index 92b2fd3..28c09d9 100644
--- a/kafka/conn.py
+++ b/kafka/conn.py
@@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092
class ConnectionStates(object):
+ DISCONNECTING = '<disconnecting>'
DISCONNECTED = '<disconnected>'
CONNECTING = '<connecting>'
CONNECTED = '<connected>'
@@ -49,6 +50,7 @@ class BrokerConnection(object):
'receive_buffer_bytes': None,
'send_buffer_bytes': None,
'api_version': (0, 8, 2), # default to most restrictive
+ 'state_change_callback': lambda conn: True,
}
def __init__(self, host, port, afi, **configs):
@@ -87,6 +89,7 @@ class BrokerConnection(object):
self._sock.setblocking(False)
self.state = ConnectionStates.CONNECTING
self.last_attempt = time.time()
+ self.config['state_change_callback'](self)
if self.state is ConnectionStates.CONNECTING:
# in non-blocking mode, use repeated calls to socket.connect_ex
@@ -101,6 +104,7 @@ class BrokerConnection(object):
if not ret or ret == errno.EISCONN:
log.debug('%s: established TCP connection', str(self))
self.state = ConnectionStates.CONNECTED
+ self.config['state_change_callback'](self)
# Connection failed
# WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems
@@ -151,6 +155,9 @@ class BrokerConnection(object):
will be failed with this exception.
Default: kafka.errors.ConnectionError.
"""
+ if self.state is not ConnectionStates.DISCONNECTED:
+ self.state = ConnectionStates.DISCONNECTING
+ self.config['state_change_callback'](self)
if self._sock:
self._sock.close()
self._sock = None
@@ -165,6 +172,7 @@ class BrokerConnection(object):
while self.in_flight_requests:
ifr = self.in_flight_requests.popleft()
ifr.future.failure(error)
+ self.config['state_change_callback'](self)
def send(self, request, expect_response=True):
"""send request, return Future()
@@ -352,6 +360,102 @@ class BrokerConnection(object):
self._correlation_id = (self._correlation_id + 1) % 2**31
return self._correlation_id
+ def check_version(self, timeout=2, strict=False):
+ """Attempt to guess the broker version. This is a blocking call."""
+
+ # Monkeypatch the connection request timeout
+ # Generally this timeout should not get triggered
+ # but in case it does, we want it to be reasonably short
+ stashed_request_timeout_ms = self.config['request_timeout_ms']
+ self.config['request_timeout_ms'] = timeout * 1000
+
+ # kafka kills the connection when it doesnt recognize an API request
+ # so we can send a test request and then follow immediately with a
+ # vanilla MetadataRequest. If the server did not recognize the first
+ # request, both will be failed with a ConnectionError that wraps
+ # socket.error (32, 54, or 104)
+ from .protocol.admin import ListGroupsRequest
+ from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest
+ from .protocol.metadata import MetadataRequest
+
+ # Socket errors are logged as exceptions and can alarm users. Mute them
+ from logging import Filter
+ class ConnFilter(Filter):
+ def filter(self, record):
+ if record.funcName in ('recv', 'send'):
+ return False
+ return True
+ log_filter = ConnFilter()
+ log.addFilter(log_filter)
+
+ test_cases = [
+ ('0.9', ListGroupsRequest[0]()),
+ ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')),
+ ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])),
+ ('0.8.0', MetadataRequest[0]([])),
+ ]
+
+ def connect():
+ self.connect()
+ if self.connected():
+ return
+ timeout_at = time.time() + timeout
+ while time.time() < timeout_at and self.connecting():
+ if self.connect() is ConnectionStates.CONNECTED:
+ return
+ time.sleep(0.05)
+ raise Errors.NodeNotReadyError()
+
+ for version, request in test_cases:
+ connect()
+ f = self.send(request)
+ # HACK: sleeping to wait for socket to send bytes
+ time.sleep(0.1)
+ # when broker receives an unrecognized request API
+ # it abruptly closes our socket.
+ # so we attempt to send a second request immediately
+ # that we believe it will definitely recognize (metadata)
+ # the attempt to write to a disconnected socket should
+ # immediately fail and allow us to infer that the prior
+ # request was unrecognized
+ metadata = self.send(MetadataRequest[0]([]))
+
+ if self._sock:
+ self._sock.setblocking(True)
+ resp_1 = self.recv()
+ resp_2 = self.recv()
+ if self._sock:
+ self._sock.setblocking(False)
+
+ assert f.is_done, 'Future is not done? Please file bug report'
+
+ if f.succeeded():
+ log.info('Broker version identifed as %s', version)
+ break
+
+ # Only enable strict checking to verify that we understand failure
+ # modes. For most users, the fact that the request failed should be
+ # enough to rule out a particular broker version.
+ if strict:
+ # If the socket flush hack did not work (which should force the
+ # connection to close and fail all pending requests), then we
+ # get a basic Request Timeout. This is not ideal, but we'll deal
+ if isinstance(f.exception, Errors.RequestTimedOutError):
+ pass
+ elif six.PY2:
+ assert isinstance(f.exception.args[0], socket.error)
+ assert f.exception.args[0].errno in (32, 54, 104)
+ else:
+ assert isinstance(f.exception.args[0], ConnectionError)
+ log.info("Broker is not v%s -- it did not recognize %s",
+ version, request.__class__.__name__)
+ else:
+ raise Errors.UnrecognizedBrokerVersion()
+
+ log.removeFilter(log_filter)
+ self.config['request_timeout_ms'] = stashed_request_timeout_ms
+ return version
+
def __repr__(self):
return "<BrokerConnection host=%s port=%d>" % (self.host, self.port)