diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 104 |
1 files changed, 104 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..28c09d9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -31,6 +31,7 @@ DEFAULT_KAFKA_PORT = 9092 class ConnectionStates(object): + DISCONNECTING = '<disconnecting>' DISCONNECTED = '<disconnected>' CONNECTING = '<connecting>' CONNECTED = '<connected>' @@ -49,6 +50,7 @@ class BrokerConnection(object): 'receive_buffer_bytes': None, 'send_buffer_bytes': None, 'api_version': (0, 8, 2), # default to most restrictive + 'state_change_callback': lambda conn: True, } def __init__(self, host, port, afi, **configs): @@ -87,6 +89,7 @@ class BrokerConnection(object): self._sock.setblocking(False) self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() + self.config['state_change_callback'](self) if self.state is ConnectionStates.CONNECTING: # in non-blocking mode, use repeated calls to socket.connect_ex @@ -101,6 +104,7 @@ class BrokerConnection(object): if not ret or ret == errno.EISCONN: log.debug('%s: established TCP connection', str(self)) self.state = ConnectionStates.CONNECTED + self.config['state_change_callback'](self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems @@ -151,6 +155,9 @@ class BrokerConnection(object): will be failed with this exception. Default: kafka.errors.ConnectionError. """ + if self.state is not ConnectionStates.DISCONNECTED: + self.state = ConnectionStates.DISCONNECTING + self.config['state_change_callback'](self) if self._sock: self._sock.close() self._sock = None @@ -165,6 +172,7 @@ class BrokerConnection(object): while self.in_flight_requests: ifr = self.in_flight_requests.popleft() ifr.future.failure(error) + self.config['state_change_callback'](self) def send(self, request, expect_response=True): """send request, return Future() @@ -352,6 +360,102 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def check_version(self, timeout=2, strict=False): + """Attempt to guess the broker version. This is a blocking call.""" + + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + stashed_request_timeout_ms = self.config['request_timeout_ms'] + self.config['request_timeout_ms'] = timeout * 1000 + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32, 54, or 104) + from .protocol.admin import ListGroupsRequest + from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from .protocol.metadata import MetadataRequest + + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + log.addFilter(log_filter) + + test_cases = [ + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), + ] + + def connect(): + self.connect() + if self.connected(): + return + timeout_at = time.time() + timeout + while time.time() < timeout_at and self.connecting(): + if self.connect() is ConnectionStates.CONNECTED: + return + time.sleep(0.05) + raise Errors.NodeNotReadyError() + + for version, request in test_cases: + connect() + f = self.send(request) + # HACK: sleeping to wait for socket to send bytes + time.sleep(0.1) + # when broker receives an unrecognized request API + # it abruptly closes our socket. + # so we attempt to send a second request immediately + # that we believe it will definitely recognize (metadata) + # the attempt to write to a disconnected socket should + # immediately fail and allow us to infer that the prior + # request was unrecognized + metadata = self.send(MetadataRequest[0]([])) + + if self._sock: + self._sock.setblocking(True) + resp_1 = self.recv() + resp_2 = self.recv() + if self._sock: + self._sock.setblocking(False) + + assert f.is_done, 'Future is not done? Please file bug report' + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + break + + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. This is not ideal, but we'll deal + if isinstance(f.exception, Errors.RequestTimedOutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + else: + raise Errors.UnrecognizedBrokerVersion() + + log.removeFilter(log_filter) + self.config['request_timeout_ms'] = stashed_request_timeout_ms + return version + def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) |