diff options
Diffstat (limited to 'kafka/conn.py')
-rw-r--r-- | kafka/conn.py | 96 |
1 files changed, 96 insertions, 0 deletions
diff --git a/kafka/conn.py b/kafka/conn.py index 92b2fd3..030a3f1 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -352,6 +352,102 @@ class BrokerConnection(object): self._correlation_id = (self._correlation_id + 1) % 2**31 return self._correlation_id + def check_version(self, timeout=2, strict=False): + """Attempt to guess the broker version. This is a blocking call.""" + + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + stashed_request_timeout_ms = self.config['request_timeout_ms'] + self.config['request_timeout_ms'] = timeout * 1000 + + # kafka kills the connection when it doesnt recognize an API request + # so we can send a test request and then follow immediately with a + # vanilla MetadataRequest. If the server did not recognize the first + # request, both will be failed with a ConnectionError that wraps + # socket.error (32, 54, or 104) + from .protocol.admin import ListGroupsRequest + from .protocol.commit import OffsetFetchRequest, GroupCoordinatorRequest + from .protocol.metadata import MetadataRequest + + # Socket errors are logged as exceptions and can alarm users. Mute them + from logging import Filter + class ConnFilter(Filter): + def filter(self, record): + if record.funcName in ('recv', 'send'): + return False + return True + log_filter = ConnFilter() + log.addFilter(log_filter) + + test_cases = [ + ('0.9', ListGroupsRequest[0]()), + ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), + ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), + ('0.8.0', MetadataRequest[0]([])), + ] + + def connect(): + self.connect() + if self.connected(): + return + timeout_at = time.time() + timeout + while time.time() < timeout_at and self.connecting(): + if self.connect() is ConnectionStates.CONNECTED: + return + time.sleep(0.05) + raise Errors.NodeNotReadyError() + + for version, request in test_cases: + connect() + f = self.send(request) + # HACK: sleeping to wait for socket to send bytes + time.sleep(0.1) + # when broker receives an unrecognized request API + # it abruptly closes our socket. + # so we attempt to send a second request immediately + # that we believe it will definitely recognize (metadata) + # the attempt to write to a disconnected socket should + # immediately fail and allow us to infer that the prior + # request was unrecognized + metadata = self.send(MetadataRequest[0]([])) + + if self._sock: + self._sock.setblocking(True) + resp_1 = self.recv() + resp_2 = self.recv() + if self._sock: + self._sock.setblocking(False) + + assert f.is_done, 'Future is not done? Please file bug report' + + if f.succeeded(): + log.info('Broker version identifed as %s', version) + break + + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. This is not ideal, but we'll deal + if isinstance(f.exception, Errors.RequestTimedOutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) + log.info("Broker is not v%s -- it did not recognize %s", + version, request.__class__.__name__) + else: + raise Errors.UnrecognizedBrokerVersion() + + log.removeFilter(log_filter) + self.config['request_timeout_ms'] = stashed_request_timeout_ms + return version + def __repr__(self): return "<BrokerConnection host=%s port=%d>" % (self.host, self.port) |