summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/admin/client.py70
-rw-r--r--test/test_admin_integration.py57
2 files changed, 112 insertions, 15 deletions
diff --git a/kafka/admin/client.py b/kafka/admin/client.py
index df85f44..bb1e2b5 100644
--- a/kafka/admin/client.py
+++ b/kafka/admin/client.py
@@ -5,6 +5,7 @@ import copy
import logging
import socket
+from . import ConfigResourceType
from kafka.vendor import six
from kafka.client_async import KafkaClient, selectors
@@ -763,29 +764,70 @@ class KafkaAdminClient(object):
supported by all versions. Default: False.
:return: Appropriate version of DescribeConfigsResponse class.
"""
+
+ # Break up requests by type - a broker config request must be sent to the specific broker.
+ # All other (currently just topic resources) can be sent to any broker.
+ broker_resources = []
+ topic_resources = []
+
+ for config_resource in config_resources:
+ if config_resource.resource_type == ConfigResourceType.BROKER:
+ broker_resources.append(self._convert_describe_config_resource_request(config_resource))
+ else:
+ topic_resources.append(self._convert_describe_config_resource_request(config_resource))
+
+ futures = []
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
- .format(self.config['api_version']))
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
- )
+ .format(self.config['api_version']))
+
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](resources=[broker_resource])
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources)
+ ))
+
elif version == 1:
- request = DescribeConfigsRequest[version](
- resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
- include_synonyms=include_synonyms
- )
+ if len(broker_resources) > 0:
+ for broker_resource in broker_resources:
+ try:
+ broker_id = int(broker_resource[1])
+ except ValueError:
+ raise ValueError("Broker resource names must be an integer or a string represented integer")
+
+ futures.append(self._send_request_to_node(
+ broker_id,
+ DescribeConfigsRequest[version](
+ resources=[broker_resource],
+ include_synonyms=include_synonyms)
+ ))
+
+ if len(topic_resources) > 0:
+ futures.append(self._send_request_to_node(
+ self._client.least_loaded_node(),
+ DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
+ ))
else:
raise NotImplementedError(
- "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
- .format(version))
- future = self._send_request_to_node(self._client.least_loaded_node(), request)
+ "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))
- self._wait_for_futures([future])
- response = future.value
- return response
+ self._wait_for_futures(futures)
+ return [f.value for f in futures]
@staticmethod
def _convert_alter_config_resource_request(config_resource):
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 3efa021..0b041b2 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -3,7 +3,8 @@ import pytest
from test.testutil import env_kafka_version
from kafka.errors import NoError
-from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+from kafka.admin import (
+ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client):
assert error is NoError
assert len(acls) == 0
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
+ """Tests that describe config returns configs for broker
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.BROKER
+ assert configs[0].resources[0][3] == str(broker_id)
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for topic
+ """
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
+ assert configs[0].resources[0][3] == topic
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for mixed resource types (topic + broker)
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([
+ ConfigResource(ConfigResourceType.TOPIC, topic),
+ ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 2
+
+ for config in configs:
+ assert (config.resources[0][2] == ConfigResourceType.TOPIC
+ and config.resources[0][3] == topic) or \
+ (config.resources[0][2] == ConfigResourceType.BROKER
+ and config.resources[0][3] == str(broker_id))
+ assert len(config.resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
+ """Tests that describe config raises exception on non-integer broker id
+ """
+ broker_id = "str"
+
+ with pytest.raises(ValueError):
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])