diff options
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 14 |
1 files changed, 10 insertions, 4 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 0ade3e9..df85f44 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -232,14 +232,20 @@ class KafkaAdminClient(object): :param operation: A list of protocol operation versions from kafka.protocol. :return: The max matching version number between client and broker. """ - version = min(len(operation) - 1, - self._client.get_api_versions()[operation[0].API_KEY][1]) - if version < self._client.get_api_versions()[operation[0].API_KEY][0]: + broker_api_versions = self._client.get_api_versions() + api_key = operation[0].API_KEY + if broker_api_versions is None or api_key not in broker_api_versions: + raise IncompatibleBrokerVersion( + "Kafka broker does not support the '{}' Kafka protocol." + .format(operation[0].__name__)) + min_version, max_version = broker_api_versions[api_key] + version = min(len(operation) - 1, max_version) + if version < min_version: # max library version is less than min broker version. Currently, # no Kafka versions specify a min msg version. Maybe in the future? raise IncompatibleBrokerVersion( "No version of the '{}' Kafka protocol is supported by both the client and broker." - .format(operation.__name__)) + .format(operation[0].__name__)) return version def _validate_timeout(self, timeout_ms): |