summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2018-03-08 13:13:00 -0500
committerDana Powers <dana.powers@rd.io>2018-03-08 19:56:47 -0500
commit98119e9b9f58e3d5de68dca18e16e9ad8bf77375 (patch)
tree7bfa698694b557d57ed2a2fd2fb16ddf58ae64b6
parent4abdb1baea2468408c36cc983dfef1e8b8f54654 (diff)
downloadkafka-python-defer_version_check.tar.gz
Defer version check until after bootstrap succeedsdefer_version_check
-rw-r--r--kafka/client_async.py27
1 files changed, 15 insertions, 12 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 58155b8..b4366c8 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -196,6 +196,9 @@ class KafkaClient(object):
assert self.config['api_version'] in self.API_VERSIONS, (
'api_version [{0}] must be one of: {1}'.format(
self.config['api_version'], str(self.API_VERSIONS)))
+ else:
+ # This should get updated after a successful bootstrap
+ self.config['api_version'] = (0, 0)
self.cluster = ClusterMetadata(**self.config)
self._topics = set() # empty set will fetch all topic metadata
@@ -228,11 +231,6 @@ class KafkaClient(object):
self._bootstrap(collect_hosts(self.config['bootstrap_servers']))
- # Check Broker Version if not set explicitly
- if self.config['api_version'] is None:
- check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
- self.config['api_version'] = self.check_version(timeout=check_timeout)
-
def _bootstrap(self, hosts):
log.info('Bootstrapping cluster metadata from %s', hosts)
# Exponential backoff if bootstrap fails
@@ -245,7 +243,7 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
+ if self.config['api_version'] < (0, 10):
metadata_request = MetadataRequest[0]([])
else:
metadata_request = MetadataRequest[1](None)
@@ -283,7 +281,13 @@ class KafkaClient(object):
else:
bootstrap.close()
self._bootstrap_fails = 0
+
+ # Check Broker Version if not set explicitly
+ if self.config['api_version'] == (0, 0):
+ check_timeout = self.config['api_version_auto_timeout_ms'] / 1000
+ self.config['api_version'] = self.check_version(timeout=check_timeout)
break
+
# No bootstrap found...
else:
log.error('Unable to bootstrap from %s', hosts)
@@ -821,10 +825,10 @@ class KafkaClient(object):
This is only possible if node_id is None.
Returns: version tuple, i.e. (0, 10), (0, 9), (0, 8, 2), ...
+ (0, 0) returned if the version cannot be determined,
+ typically due to networking.
Raises:
- NodeNotReadyError (if node_id is provided)
- NoBrokersAvailable (if node_id is None)
UnrecognizedBrokerVersion: please file bug if seen!
AssertionError (if strict=True): please file bug if seen!
"""
@@ -835,7 +839,7 @@ class KafkaClient(object):
# which can block for an increasing backoff period
try_node = node_id or self.least_loaded_node()
if try_node is None:
- raise Errors.NoBrokersAvailable()
+ return (0, 0)
self._maybe_connect(try_node)
conn = self._conns[try_node]
@@ -847,15 +851,14 @@ class KafkaClient(object):
version = conn.check_version(timeout=remaining, strict=strict)
return version
except Errors.NodeNotReadyError:
- # Only raise to user if this is a node-specific request
if node_id is not None:
- raise
+ return (0, 0)
finally:
self._refresh_on_disconnects = True
# Timeout
else:
- raise Errors.NoBrokersAvailable()
+ return (0, 0)
def wakeup(self):
with self._wake_lock: