summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py16
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a161bb..ccf1e4b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -196,6 +196,7 @@ class KafkaClient(object):
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
+ self._api_versions = None
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
@@ -808,6 +809,17 @@ class KafkaClient(object):
# to let us know the selected connection might be usable again.
return float('inf')
+ def get_api_versions(self):
+ """Return the ApiVersions map, if available.
+
+ Note: A call to check_version must previously have succeeded and returned
+ version 0.10.0 or later
+
+ Returns: a map of dict mapping {api_key : (min_version, max_version)},
+ or None if ApiVersion is not supported by the kafka cluster.
+ """
+ return self._api_versions
+
def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the version of a Kafka broker.
@@ -841,6 +853,10 @@ class KafkaClient(object):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
+ if version >= (0, 10, 0):
+ # cache the api versions map if it's available (starting
+ # in 0.10 cluster version)
+ self._api_versions = conn.get_api_versions()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request