diff options
author | Jeppe Andersen <2197398+jlandersen@users.noreply.github.com> | 2019-10-11 20:46:52 +0200 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-10-11 11:46:52 -0700 |
commit | 6d3800ca9f45fd953689a1787fc90a5e566e34ea (patch) | |
tree | f47705bfa7ba965a1e505cb3714116eb36771e20 /kafka/admin/client.py | |
parent | 84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (diff) | |
download | kafka-python-6d3800ca9f45fd953689a1787fc90a5e566e34ea.tar.gz |
Fix describe config for multi-broker clusters (#1869)
* Fix describe config for multi-broker clusters
Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node.
This changes the logic to send all describe config requests to the specific broker.
Diffstat (limited to 'kafka/admin/client.py')
-rw-r--r-- | kafka/admin/client.py | 70 |
1 files changed, 56 insertions, 14 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py index df85f44..bb1e2b5 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -5,6 +5,7 @@ import copy import logging import socket +from . import ConfigResourceType from kafka.vendor import six from kafka.client_async import KafkaClient, selectors @@ -763,29 +764,70 @@ class KafkaAdminClient(object): supported by all versions. Default: False. :return: Appropriate version of DescribeConfigsResponse class. """ + + # Break up requests by type - a broker config request must be sent to the specific broker. + # All other (currently just topic resources) can be sent to any broker. + broker_resources = [] + topic_resources = [] + + for config_resource in config_resources: + if config_resource.resource_type == ConfigResourceType.BROKER: + broker_resources.append(self._convert_describe_config_resource_request(config_resource)) + else: + topic_resources.append(self._convert_describe_config_resource_request(config_resource)) + + futures = [] version = self._matching_api_version(DescribeConfigsRequest) if version == 0: if include_synonyms: raise IncompatibleBrokerVersion( "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self.config['api_version'])) - request = DescribeConfigsRequest[version]( - resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources] - ) + .format(self.config['api_version'])) + + if len(broker_resources) > 0: + for broker_resource in broker_resources: + try: + broker_id = int(broker_resource[1]) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + + futures.append(self._send_request_to_node( + broker_id, + DescribeConfigsRequest[version](resources=[broker_resource]) + )) + + if len(topic_resources) > 0: + futures.append(self._send_request_to_node( + self._client.least_loaded_node(), + DescribeConfigsRequest[version](resources=topic_resources) + )) + elif version == 1: - request = DescribeConfigsRequest[version]( - resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources], - include_synonyms=include_synonyms - ) + if len(broker_resources) > 0: + for broker_resource in broker_resources: + try: + broker_id = int(broker_resource[1]) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + + futures.append(self._send_request_to_node( + broker_id, + DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + )) + + if len(topic_resources) > 0: + futures.append(self._send_request_to_node( + self._client.least_loaded_node(), + DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) + )) else: raise NotImplementedError( - "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." - .format(version)) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version)) - self._wait_for_futures([future]) - response = future.value - return response + self._wait_for_futures(futures) + return [f.value for f in futures] @staticmethod def _convert_alter_config_resource_request(config_resource): |