diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-04-07 23:53:22 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-04-08 15:56:24 -0700 |
commit | 45b2c99690fad07930d11dffb8d93dca19104c50 (patch) | |
tree | ea90e75f0d5048a39f52259456d00e646066dfe5 /kafka/client_async.py | |
parent | 4323e5c21cb151728b7985e24a1ad44fd36fd9fb (diff) | |
download | kafka-python-45b2c99690fad07930d11dffb8d93dca19104c50.tar.gz |
Move check_version() logic to BrokerConnection
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 81 |
1 files changed, 3 insertions, 78 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index cfc89fc..64233f8 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -597,84 +597,9 @@ class KafkaClient(object): if node_id is None: raise Errors.NoBrokersAvailable() - def connect(node_id): - timeout_at = time.time() + timeout - # brokers < 0.9 do not return any broker metadata if there are no topics - # so we're left with a single bootstrap connection - while not self.ready(node_id): - if time.time() >= timeout_at: - raise Errors.NodeNotReadyError(node_id) - time.sleep(0.025) - - # Monkeypatch the connection request timeout - # Generally this timeout should not get triggered - # but in case it does, we want it to be reasonably short - self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 - - # kafka kills the connection when it doesnt recognize an API request - # so we can send a test request and then follow immediately with a - # vanilla MetadataRequest. If the server did not recognize the first - # request, both will be failed with a ConnectionError that wraps - # socket.error (32, 54, or 104) - import socket - from .protocol.admin import ListGroupsRequest - from .protocol.commit import ( - OffsetFetchRequest, GroupCoordinatorRequest) - from .protocol.metadata import MetadataRequest - - # Socket errors are logged as exceptions and can alarm users. Mute them - from logging import Filter - class ConnFilter(Filter): - def filter(self, record): - if record.funcName in ('recv', 'send'): - return False - return True - log_filter = ConnFilter() - - test_cases = [ - ('0.9', ListGroupsRequest[0]()), - ('0.8.2', GroupCoordinatorRequest[0]('kafka-python-default-group')), - ('0.8.1', OffsetFetchRequest[0]('kafka-python-default-group', [])), - ('0.8.0', MetadataRequest[0]([])), - ] - - logging.getLogger('kafka.conn').addFilter(log_filter) - for version, request in test_cases: - connect(node_id) - f = self.send(node_id, request) - time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes - metadata = self.send(node_id, MetadataRequest[0]([])) - self.poll(future=f) - self.poll(future=metadata) - - assert f.is_done, 'Future is not done? Please file bug report' - - if f.succeeded(): - log.info('Broker version identifed as %s', version) - break - - # Only enable strict checking to verify that we understand failure - # modes. For most users, the fact that the request failed should be - # enough to rule out a particular broker version. - if strict: - # If the socket flush hack did not work (which should force the - # connection to close and fail all pending requests), then we - # get a basic Request Timeout. Thisisn - if isinstance(f.exception, Errors.RequestTimedOutError): - pass - elif six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) - log.info("Broker is not v%s -- it did not recognize %s", - version, request.__class__.__name__) - else: - - raise Errors.UnrecognizedBrokerVersion() - - logging.getLogger('kafka.conn').removeFilter(log_filter) - self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] + self._maybe_connect(node_id) + conn = self._conns[node_id] + version = conn.check_version() return version def wakeup(self): |