summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorRichard Lee <github@richardlee.name>2018-07-12 11:39:29 -0700
committerJeff Widman <jeff@jeffwidman.com>2018-10-24 22:42:12 -0700
commit481f88068bdf0a18f12fd7a811b795f889d35fc7 (patch)
tree818f3b1ff92c847f90da3e9f2603d8100e899a50 /kafka/client_async.py
parentac9d5623116a5754c57a8ecd95b2954ba0f30c14 (diff)
downloadkafka-python-481f88068bdf0a18f12fd7a811b795f889d35fc7.tar.gz
Add KafkaAdmin class
Requires cluster version > 0.10.0.0, and uses new wire protocol classes to do many things via broker connection that previously needed to be done directly in zookeeper.
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py16
1 files changed, 16 insertions, 0 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 5a161bb..ccf1e4b 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -196,6 +196,7 @@ class KafkaClient(object):
self._metadata_refresh_in_progress = False
self._selector = self.config['selector']()
self._conns = Dict() # object to support weakrefs
+ self._api_versions = None
self._connecting = set()
self._refresh_on_disconnects = True
self._last_bootstrap = 0
@@ -808,6 +809,17 @@ class KafkaClient(object):
# to let us know the selected connection might be usable again.
return float('inf')
+ def get_api_versions(self):
+ """Return the ApiVersions map, if available.
+
+ Note: A call to check_version must previously have succeeded and returned
+ version 0.10.0 or later
+
+ Returns: a map of dict mapping {api_key : (min_version, max_version)},
+ or None if ApiVersion is not supported by the kafka cluster.
+ """
+ return self._api_versions
+
def check_version(self, node_id=None, timeout=2, strict=False):
"""Attempt to guess the version of a Kafka broker.
@@ -841,6 +853,10 @@ class KafkaClient(object):
try:
remaining = end - time.time()
version = conn.check_version(timeout=remaining, strict=strict, topics=list(self.config['bootstrap_topics_filter']))
+ if version >= (0, 10, 0):
+ # cache the api versions map if it's available (starting
+ # in 0.10 cluster version)
+ self._api_versions = conn.get_api_versions()
return version
except Errors.NodeNotReadyError:
# Only raise to user if this is a node-specific request