diff options
Diffstat (limited to 'kafka/client_async.py')
| -rw-r--r-- | kafka/client_async.py | 81 | 
1 files changed, 3 insertions, 78 deletions
| diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..64233f8 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -597,84 +597,9 @@ class KafkaClient(object):              if node_id is None:                  raise Errors.NoBrokersAvailable() -        def connect(node_id): -            timeout_at = time.time() + timeout -            # brokers < 0.9 do not return any broker metadata if there are no topics -            # so we're left with a single bootstrap connection -            while not self.ready(node_id): -                if time.time() >= timeout_at: -                    raise Errors.NodeNotReadyError(node_id) -                time.sleep(0.025) - -            # Monkeypatch the connection request timeout -            # Generally this timeout should not get triggered -            # but in case it does, we want it to be reasonably short -            self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 - -        # kafka kills the connection when it doesnt recognize an API request -        # so we can send a test request and then follow immediately with a -        # vanilla MetadataRequest. If the server did not recognize the first -        # request, both will be failed with a ConnectionError that wraps -        # socket.error (32, 54, or 104) -        import socket -        from .protocol.admin import ListGroupsRequest -        from .protocol.commit import ( -            OffsetFetchRequest, GroupCoordinatorRequest) -        from .protocol.metadata import MetadataRequest - -        # Socket errors are logged as exceptions and can alarm users. Mute them -        from logging import Filter -        class ConnFilter(Filter): -            def filter(self, record): -                if record.funcName in ('recv', 'send'): -                    return False -                return True -        log_filter = ConnFilter() - -        test_cases = [ -            ('0.9', ListGroupsRequest[0]()), -            ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), -            ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), -            ('0.8.0', MetadataRequest[0]([])), -        ] - -        logging.getLogger('kafka.conn').addFilter(log_filter) -        for version, request in test_cases: -            connect(node_id) -            f = self.send(node_id, request) -            time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes -            metadata = self.send(node_id, MetadataRequest[0]([])) -            self.poll(future=f) -            self.poll(future=metadata) - -            assert f.is_done, 'Future is not done? Please file bug report' - -            if f.succeeded(): -                log.info('Broker version identifed as %s', version) -                break - -            # Only enable strict checking to verify that we understand failure -            # modes. For most users, the fact that the request failed should be -            # enough to rule out a particular broker version. -            if strict: -                # If the socket flush hack did not work (which should force the -                # connection to close and fail all pending requests), then we -                # get a basic Request Timeout. Thisisn -                if isinstance(f.exception, Errors.RequestTimedOutError): -                    pass -                elif six.PY2: -                    assert isinstance(f.exception.args[0], socket.error) -                    assert f.exception.args[0].errno in (32, 54, 104) -                else: -                    assert isinstance(f.exception.args[0], ConnectionError) -            log.info("Broker is not v%s -- it did not recognize %s", -                     version, request.__class__.__name__) -        else: - -            raise Errors.UnrecognizedBrokerVersion() - -        logging.getLogger('kafka.conn').removeFilter(log_filter) -        self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] +        self._maybe_connect(node_id) +        conn = self._conns[node_id] +        version = conn.check_version()          return version      def wakeup(self): | 
