diff options
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 16 |
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 5a161bb..ccf1e4b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -196,6 +196,7 @@ class KafkaClient(object): self._metadata_refresh_in_progress = False self._selector = self.config['selector']() self._conns = Dict() # object to support weakrefs + self._api_versions = None self._connecting = set() self._refresh_on_disconnects = True self._last_bootstrap = 0 @@ -808,6 +809,17 @@ class KafkaClient(object): # to let us know the selected connection might be usable again. return float('inf') + def get_api_versions(self): + """Return the ApiVersions map, if available. + + Note: A call to check_version must previously have succeeded and returned + version 0.10.0 or later + + Returns: a map of dict mapping {api_key : (min_version, max_version)}, + or None if ApiVersion is not supported by the kafka cluster. + """ + return self._api_versions + def check_version(self, node_id=None, timeout=2, strict=False): """Attempt to guess the version of a Kafka broker. @@ -841,6 +853,10 @@ class KafkaClient(object): try: remaining = end - time.time() version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter'])) + if version >= (0, 10, 0): + # cache the api versions map if it's available (starting + # in 0.10 cluster version) + self._api_versions = conn.get_api_versions() return version except Errors.NodeNotReadyError: # Only raise to user if this is a node-specific request |