summaryrefslogtreecommitdiff
path: root/test/test_admin_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r--test/test_admin_integration.py57
1 files changed, 56 insertions, 1 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 3efa021..0b041b2 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -3,7 +3,8 @@ import pytest
from test.testutil import env_kafka_version
from kafka.errors import NoError
-from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+from kafka.admin import (
+ ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
@@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client):
assert error is NoError
assert len(acls) == 0
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
+ """Tests that describe config returns configs for broker
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.BROKER
+ assert configs[0].resources[0][3] == str(broker_id)
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for topic
+ """
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])
+
+ assert len(configs) == 1
+ assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
+ assert configs[0].resources[0][3] == topic
+ assert len(configs[0].resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
+ """Tests that describe config returns configs for mixed resource types (topic + broker)
+ """
+ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
+ configs = kafka_admin_client.describe_configs([
+ ConfigResource(ConfigResourceType.TOPIC, topic),
+ ConfigResource(ConfigResourceType.BROKER, broker_id)])
+
+ assert len(configs) == 2
+
+ for config in configs:
+ assert (config.resources[0][2] == ConfigResourceType.TOPIC
+ and config.resources[0][3] == topic) or \
+ (config.resources[0][2] == ConfigResourceType.BROKER
+ and config.resources[0][3] == str(broker_id))
+ assert len(config.resources[0][4]) > 1
+
+
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
+def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
+ """Tests that describe config raises exception on non-integer broker id
+ """
+ broker_id = "str"
+
+ with pytest.raises(ValueError):
+ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])