summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py17
1 files changed, 14 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 6fa9434..e064d51 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -178,7 +178,11 @@ class KafkaClient(object):
time.sleep(next_at - now)
self._last_bootstrap = time.time()
- metadata_request = MetadataRequest[0]([])
+ if self.config['api_version'] is None or self.config['api_version'] < (0, 10):
+ metadata_request = MetadataRequest[0]([])
+ else:
+ metadata_request = MetadataRequest[1](None)
+
for host, port, afi in hosts:
log.debug("Attempting to bootstrap via node at %s:%s", host, port)
cb = functools.partial(self._conn_state_change, 'bootstrap')
@@ -643,10 +647,17 @@ class KafkaClient(object):
topics = list(self._topics)
if self.cluster.need_all_topic_metadata:
- topics = []
+ if self.config['api_version'] < (0, 10):
+ topics = []
+ else:
+ topics = None
if self._can_send_request(node_id):
- request = MetadataRequest[0](topics)
+ if self.config['api_version'] < (0, 10):
+ api_version = 0
+ else:
+ api_version = 1
+ request = MetadataRequest[api_version](topics)
log.debug("Sending metadata request %s to node %s", request, node_id)
future = self.send(node_id, request)
future.add_callback(self.cluster.update_metadata)