summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py13
1 files changed, 10 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a16f6b..c0072ae 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -149,6 +149,7 @@ class KafkaClient(object):
DEFAULT_CONFIG = {
'bootstrap_servers': 'localhost',
+ 'bootstrap_topics_filter': set(),
'client_id': 'kafka-python-' + __version__,
'request_timeout_ms': 30000,
'connections_max_idle_ms': 9 * 60 * 1000,
@@ -236,9 +237,15 @@ class KafkaClient(object):
self._last_bootstrap = time.time()
if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
- metadata_request = MetadataRequest[0]([])
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[0](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[0]([])
else:
- metadata_request = MetadataRequest[1](None)
+ if self.config['bootstrap_topics_filter']:
+ metadata_request = MetadataRequest[1](list(self.config['bootstrap_topics_filter']))
+ else:
+ metadata_request = MetadataRequest[1](None)
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
@@ -830,7 +837,7 @@ class KafkaClient(object):
self._refresh_on_disconnects = False
try:
remaining = end - time.time()
- version = conn.check_version(timeout=remaining, strict=strict)
+ version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request