diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-07-16 21:20:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-07-16 21:20:42 -0700 |
commit | 5ab4d5c274112a4e2024dea415a0ec4b79009a28 (patch) | |
tree | 2f75731a028194d92d8df916a2a6c553385aae80 /kafka/client_async.py | |
parent | 2a7f4dbb8159464941afa25d49428976cc05f902 (diff) | |
parent | 277f0ddd61c230181f5f21d427070ec44b36a257 (diff) | |
download | kafka-python-5ab4d5c274112a4e2024dea415a0ec4b79009a28.tar.gz |
Merge pull request #762 from dpkp/metadata_v1
Use Metadata Request/Response v1 with 0.10+ brokers
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 17 |
1 files changed, 14 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 6fa9434..e064d51 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -178,7 +178,11 @@ class KafkaClient(object): time.sleep(next_at - now) self._last_bootstrap = time.time() - metadata_request = MetadataRequest[0]([]) + if self.config['api_version'] is None or self.config['api_version'] < (0, 10): + metadata_request = MetadataRequest[0]([]) + else: + metadata_request = MetadataRequest[1](None) + for host, port, afi in hosts: log.debug("Attempting to bootstrap via node at %s:%s", host, port) cb = functools.partial(self._conn_state_change, 'bootstrap') @@ -643,10 +647,17 @@ class KafkaClient(object): topics = list(self._topics) if self.cluster.need_all_topic_metadata: - topics = [] + if self.config['api_version'] < (0, 10): + topics = [] + else: + topics = None if self._can_send_request(node_id): - request = MetadataRequest[0](topics) + if self.config['api_version'] < (0, 10): + api_version = 0 + else: + api_version = 1 + request = MetadataRequest[api_version](topics) log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request) future.add_callback(self.cluster.update_metadata) |