diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2018-11-20 10:56:32 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-11-20 11:03:46 -0800 |
commit | 574c56ac7c25e481d775e54f621f73de95741dd6 (patch) | |
tree | 103dab3a120d64a0003a6b2433c9c9b4679bf386 | |
parent | fcc800f96f14192c44b09d1d37108377dcaed245 (diff) | |
download | kafka-python-differentiate-between-client-and-broker-version-errors.tar.gz |
Differentiate between client and broker version errorsdifferentiate-between-client-and-broker-version-errors
While both indicate a mismatch between the broker and client, we want to
be able to differentiate between whether the error is thrown by the
client vs the broker.
-rw-r--r-- | kafka/admin/client.py | 37 | ||||
-rw-r--r-- | kafka/client_async.py | 2 | ||||
-rw-r--r-- | kafka/conn.py | 6 | ||||
-rw-r--r-- | kafka/consumer/group.py | 16 | ||||
-rw-r--r-- | kafka/errors.py | 17 |
5 files changed, 61 insertions, 17 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index e25afe7..5b1e03b 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -35,12 +35,23 @@ class KafkaAdminClient(object): nicer, more pythonic objects. Unfortunately, this will likely break those interfaces. - The KafkaAdminClient class will negotiate for the latest version of each message - protocol format supported by both the kafka-python client library and the - Kafka broker. Usage of optional fields from protocol versions that are not - supported by the broker will result in IncompatibleBrokerVersion exceptions. - - Use of this class requires a minimum broker version >= 0.10.0.0. + The KafkaAdminClient class will negotiate for the latest version of each + Kafka protocol message format supported by both the kafka-python client + library and the Kafka broker. Two different exceptions can be raised when a + method needs to use a Kafka protocol message format that is not supported + by the broker. An IncompatibleBrokerVersion exception means kafka-python + does not think the broker supports that message format and does not even + try to send it. An UnsupportedVersionError exception means that + kafka-python tried to send the message and the broker rejected it. If you + encounter UnsupportedVersionError, please file a bug, as we would prefer + to identify the error client-side and raise a IncompatibleBrokerVersion. + + Note: + If the KafkaAdminClient's `api_version` is unset, then the broker + version is ascertained by querying a random broker in the cluster. So + if the cluster is running a mixture of old and new brokers, there is a + chance the version-checking will be incorrect. In that case, please pin + the `api_version` of the KafkaAdminClient. Keyword Arguments: bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' @@ -252,7 +263,7 @@ class KafkaAdminClient(object): controller_version = self._client.check_version(controller_id) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( - "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." + "The controller appears to be running Kafka {}. KafkaAdminClient requires controllers >= 0.10.0.0." .format(controller_version)) self._controller_id = controller_id else: @@ -591,6 +602,10 @@ class KafkaAdminClient(object): partition assignments. """ group_descriptions = [] + # TODO this can be used against brokers older than 0.10.0.0, so either + # need to fix BrokerConnection.get_api_versions() (and the docstring of + # KafkaClient.get_api_versions()), or add a bypass here when api_version is + # old. version = self._matching_api_version(DescribeGroupsRequest) for group_id in group_ids: if group_coordinator_id is not None: @@ -661,6 +676,10 @@ class KafkaAdminClient(object): consumer_groups = set() if broker_ids is None: broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] + # TODO this can be used against brokers older than 0.10.0.0, so either + # need to fix BrokerConnection.get_api_versions() (and the docstring of + # KafkaClient.get_api_versions()), or add a bypass here when api_version is + # old. version = self._matching_api_version(ListGroupsRequest) if version <= 2: request = ListGroupsRequest[version]() @@ -707,6 +726,10 @@ class KafkaAdminClient(object): group_offsets_listing = {} if group_coordinator_id is None: group_coordinator_id = self._find_group_coordinator_id(group_id) + # TODO this can be used against brokers older than 0.10.0.0, so either + # need to fix BrokerConnection.get_api_versions() (and the docstring of + # KafkaClient.get_api_versions()), or add a bypass here when api_version is + # old. version = self._matching_api_version(OffsetFetchRequest) if version <= 3: if partitions is None: diff --git a/kafka/client_async.py b/kafka/client_async.py index b0d1f5e..566634b 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -814,6 +814,8 @@ class KafkaClient(object): def get_api_versions(self): """Return the ApiVersions map, if available. + # TODO if I update the BrokerConnection.get_api_versions(), also update + # this docstring Note: A call to check_version must previously have succeeded and returned version 0.10.0 or later diff --git a/kafka/conn.py b/kafka/conn.py index 471bae7..bc87a22 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -876,7 +876,11 @@ class BrokerConnection(object): def get_api_versions(self): version = self.check_version() if version < (0, 10, 0): - raise Errors.UnsupportedVersionError( + # TODO this currently blocks using various KafkaAdmin methods + # against older brokers that are supported, such as + # list_consumer_groups(), list_consumer_group_offsets(), + # describe_consumer_groups(), and _find_group_coordinator() + raise Errors.IncompatibleBrokerVersion( "ApiVersion not supported by cluster version {} < 0.10.0" .format(version)) # _api_versions is set as a side effect of check_versions() on a cluster diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 8727de7..5f75c48 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -5,7 +5,7 @@ import logging import socket import time -from kafka.errors import KafkaConfigurationError, UnsupportedVersionError +from kafka.errors import KafkaConfigurationError, IncompatibleBrokerVersion from kafka.vendor import six @@ -943,12 +943,12 @@ class KafkaConsumer(six.Iterator): Raises: ValueError: If the target timestamp is negative - UnsupportedVersionError: If the broker does not support looking - up the offsets by timestamp. + IncompatibleBrokerVersion: Raised by kafka-python when it does not + think the broker supports looking up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ if self.config['api_version'] <= (0, 10, 0): - raise UnsupportedVersionError( + raise IncompatibleBrokerVersion( "offsets_for_times API not supported for cluster version {}" .format(self.config['api_version'])) for tp, ts in six.iteritems(timestamps): @@ -978,8 +978,8 @@ class KafkaConsumer(six.Iterator): given partitions. Raises: - UnsupportedVersionError: If the broker does not support looking - up the offsets by timestamp. + IncompatibleBrokerVersion: Raised by kafka-python when it does not + think the broker supports looking up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms. """ offsets = self._fetcher.beginning_offsets( @@ -1005,8 +1005,8 @@ class KafkaConsumer(six.Iterator): ``{TopicPartition: int}``: The end offsets for the given partitions. Raises: - UnsupportedVersionError: If the broker does not support looking - up the offsets by timestamp. + IncompatibleBrokerVersion: Raised by kafka-python when it does not + think the broker supports looking up the offsets by timestamp. KafkaTimeoutError: If fetch failed in request_timeout_ms """ offsets = self._fetcher.end_offsets( diff --git a/kafka/errors.py b/kafka/errors.py index 118e430..aac167c 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -59,10 +59,18 @@ class MetadataEmptyBrokerList(KafkaError): class UnrecognizedBrokerVersion(KafkaError): + # Cannot determine the broker version pass class IncompatibleBrokerVersion(KafkaError): + # TODO convert this first comment to the default exception message... need to check message vs description is used when assembling the exception string + # kafka-python thinks the broker version is incompatible with this Kafka + # protocol message format and does not even try to send the message. + + # See also UnsupportedBrokerVersion for the broker-side equivalent. + # Although the error is the same, we want to be able to differentiate + # between client-side and broker-side errors for easier debugging. pass @@ -377,9 +385,16 @@ class IllegalSaslStateError(BrokerResponseError): class UnsupportedVersionError(BrokerResponseError): + # See also IncompatibleBrokerVersion for the client-side equivalent. + # Although the error is the same, we want to be able to differentiate + # between client-side and broker-side errors for easier debugging. errno = 35 message = 'UNSUPPORTED_VERSION' - description = 'The version of API is not supported.' + description = 'The version of API is not supported. If you encounter this ' + 'error while using normal kafka-python methods and not + 'directly using the low-level protocol structs, please file ' + 'an issue as we would prefer to identify the error ' + 'client-side and raise an IncompatibleBrokerVersion.' class TopicAlreadyExistsError(BrokerResponseError): |