summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py40
1 files changed, 27 insertions, 13 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 029a419..844d3b3 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -565,20 +565,25 @@ class KafkaClient(object):
"""
self._delayed_tasks.remove(task)
- def check_version(self, node_id=None):
+ def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the broker version"""
if node_id is None:
node_id = self.least_loaded_node()
- def connect():
- timeout = time.time() + 10
+ def connect(node_id):
+ timeout_at = time.time() + timeout
# brokers < 0.9 do not return any broker metadata if there are no topics
# so we're left with a single bootstrap connection
while not self.ready(node_id):
- if time.time() >= timeout:
+ if time.time() >= timeout_at:
raise Errors.NodeNotReadyError(node_id)
time.sleep(0.025)
+ # Monkeypatch the connection request timeout
+ # Generally this timeout should not get triggered
+ # but in case it does, we want it to be reasonably short
+ self._conns[node_id].config['request_timeout_ms'] = timeout * 1000
+
# kafka kills the connection when it doesnt recognize an API request
# so we can send a test request and then follow immediately with a
# vanilla MetadataRequest. If the server did not recognize the first
@@ -608,32 +613,41 @@ class KafkaClient(object):
logging.getLogger('kafka.conn').addFilter(log_filter)
for version, request in test_cases:
- connect()
+ connect(node_id)
f = self.send(node_id, request)
time.sleep(0.1) # HACK: sleeping to wait for socket to send bytes
metadata = self.send(node_id, MetadataRequest([]))
self.poll(future=f)
self.poll(future=metadata)
- assert f.is_done
+ assert f.is_done, 'Future is not done? Please file bug report'
if f.succeeded():
log.info('Broker version identifed as %s', version)
break
- if six.PY2:
- assert isinstance(f.exception.args[0], socket.error)
- assert f.exception.args[0].errno in (32, 54, 104)
- else:
- assert isinstance(f.exception.args[0], ConnectionError)
+ # Only enable strict checking to verify that we understand failure
+ # modes. For most users, the fact that the request failed should be
+ # enough to rule out a particular broker version.
+ if strict:
+ # If the socket flush hack did not work (which should force the
+ # connection to close and fail all pending requests), then we
+ # get a basic Request Timeout. Thisisn
+ if isinstance(f.exception, Errors.RequestTimeoutError):
+ pass
+ elif six.PY2:
+ assert isinstance(f.exception.args[0], socket.error)
+ assert f.exception.args[0].errno in (32, 54, 104)
+ else:
+ assert isinstance(f.exception.args[0], ConnectionError)
log.info("Broker is not v%s -- it did not recognize %s",
version, request.__class__.__name__)
- continue
-
else:
+
raise Errors.UnrecognizedBrokerVersion()
logging.getLogger('kafka.conn').removeFilter(log_filter)
+ self._conns[node_id].config['request_timeout_ms'] = self.config['request_timeout_ms']
return version
def wakeup(self):