diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 09:37:43 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 09:52:58 -0800 |
commit | 4d17ec257f7934097e048b190df0075068787b6b (patch) | |
tree | ab9fbe1493e5f5c39ad7a10b9f32c55184f7bdab /kafka/client_async.py | |
parent | 48a71beb2f8c565debd9ea07fc8ada7a2210cbdf (diff) | |
download | kafka-python-4d17ec257f7934097e048b190df0075068787b6b.tar.gz |
Improve KafkaClient.check_version()
- only raise AssertionErrors if strict=True (default False)
- connect timeout is configurable (default 2secs)
- monkeypatch request_timeout_ms config and check for RequestTimeoutErrors
- add assertion error message
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 40 |
1 files changed, 27 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 029a419..844d3b3 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -565,20 +565,25 @@ class KafkaClient(object): """ self._delayed_tasks.remove(task) - def check_version(self, node_id=None): + def check_version(self, node_id=None, timeout=2, strict=False): """Attempt to guess the broker version""" if node_id is None: node_id = self.least_loaded_node() - def connect(): - timeout = time.time() + 10 + def connect(node_id): + timeout_at = time.time() + timeout # brokers < 0.9 do not return any broker metadata if there are no topics # so we're left with a single bootstrap connection while not self.ready(node_id): - if time.time() >= timeout: + if time.time() >= timeout_at: raise Errors.NodeNotReadyError(node_id) time.sleep(0.025) + # Monkeypatch the connection request timeout + # Generally this timeout should not get triggered + # but in case it does, we want it to be reasonably short + self._conns[node_id].config['request_timeout_ms'] = timeout * 1000 + # kafka kills the connection when it doesnt recognize an API request # so we can send a test request and then follow immediately with a # vanilla MetadataRequest. If the server did not recognize the first @@ -608,32 +613,41 @@ class KafkaClient(object): logging.getLogger('kafka.conn').addFilter(log_filter) for version, request in test_cases: - connect() + connect(node_id) f = self.send(node_id, request) time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes metadata = self.send(node_id, MetadataRequest([])) self.poll(future=f) self.poll(future=metadata) - assert f.is_done + assert f.is_done, 'Future is not done? Please file bug report' if f.succeeded(): log.info('Broker version identifed as %s', version) break - if six.PY2: - assert isinstance(f.exception.args[0], socket.error) - assert f.exception.args[0].errno in (32, 54, 104) - else: - assert isinstance(f.exception.args[0], ConnectionError) + # Only enable strict checking to verify that we understand failure + # modes. For most users, the fact that the request failed should be + # enough to rule out a particular broker version. + if strict: + # If the socket flush hack did not work (which should force the + # connection to close and fail all pending requests), then we + # get a basic Request Timeout. Thisisn + if isinstance(f.exception, Errors.RequestTimeoutError): + pass + elif six.PY2: + assert isinstance(f.exception.args[0], socket.error) + assert f.exception.args[0].errno in (32, 54, 104) + else: + assert isinstance(f.exception.args[0], ConnectionError) log.info("Broker is not v%s -- it did not recognize %s", version, request.__class__.__name__) - continue - else: + raise Errors.UnrecognizedBrokerVersion() logging.getLogger('kafka.conn').removeFilter(log_filter) + self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms'] return version def wakeup(self): |