summaryrefslogtreecommitdiff
path: root/kafka/admin/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r--kafka/admin/client.py14
1 files changed, 10 insertions, 4 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index 0ade3e9..df85f44 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -232,14 +232,20 @@ class KafkaAdminClient(object):
:param operation: A list of protocol operation versions from kafka.protocol.
:return: The max matching version number between client and broker.
"""
- version = min(len(operation) - 1,
- self._client.get_api_versions()[operation[0].API_KEY][1])
- if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
+ broker_api_versions = self._client.get_api_versions()
+ api_key = operation[0].API_KEY
+ if broker_api_versions is None or api_key not in broker_api_versions:
+ raise IncompatibleBrokerVersion(
+ "Kafka broker does not support the '{}' Kafka protocol."
+ .format(operation[0].__name__))
+ min_version, max_version = broker_api_versions[api_key]
+ version = min(len(operation) - 1, max_version)
+ if version < min_version:
# max library version is less than min broker version. Currently,
# no Kafka versions specify a min msg version. Maybe in the future?
raise IncompatibleBrokerVersion(
"No version of the '{}' Kafka protocol is supported by both the client and broker."
- .format(operation.__name__))
+ .format(operation[0].__name__))
return version
def _validate_timeout(self, timeout_ms):