summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorNing Xie <andy.xning@gmail.com>2018-08-31 21:01:46 +0800
committerDana Powers <dana.powers@gmail.com>2018-08-31 06:01:46 -0700
commita7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9 (patch)
tree41acc8d3722d9e15c74b1ae8a7264250394bf9bc /kafka/client_async.py
parent9ac3cb1ec220ff9968a8b003b02e98dd11cc486b (diff)
downloadkafka-python-a7d3063d5fa1c3cb2a76c16231bb3028a6f8cde9.tar.gz
add support for smaller topic metadata fetch during bootstrap (#1541)
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a16f6b..c0072ae 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -149,6 +149,7 @@ class KafkaClient(object):
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
+ 'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
'connections_max_idle_ms': 9 * 60 * 1000,
@@ -236,9 +237,15 @@ class KafkaClient(object):
self._last_bootstrap = time.time()
if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
- metadata_request = MetadataRequest[0]([])
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[0]([])
else:
- metadata_request = MetadataRequest[1](None)
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[1](None)
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
@@ -830,7 +837,7 @@ class KafkaClient(object):
self._refresh_on_disconnects = False
try:
remaining = end - time.time()
- version = conn.check_version(timeout=remaining, strict=strict)
+ version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request