summaryrefslogtreecommitdiff
path: root/test/test_admin_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r--test/test_admin_integration.py164
1 files changed, 62 insertions, 102 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py
index 2672faa..3efa021 100644
--- a/test/test_admin_integration.py
+++ b/test/test_admin_integration.py
@@ -1,122 +1,82 @@
import pytest
-import os
-from test.fixtures import ZookeeperFixture, KafkaFixture
-from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
+from test.testutil import env_kafka_version
from kafka.errors import NoError
-from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
+from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
-# This test suite passes for me locally, but fails on travis
-# Needs investigation
-DISABLED = True
-# TODO: Convert to pytest / fixtures
-# Note that ACL features require broker 0.11, but other admin apis may work on
-# earlier broker versions
-class TestAdminClientIntegration(KafkaIntegrationTestCase):
- @classmethod
- def setUpClass(cls): # noqa
- if env_kafka_version() < (0, 11) or DISABLED:
- return
+@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
+def test_create_describe_delete_acls(kafka_admin_client):
+ """Tests that we can add, list and remove ACLs
+ """
- cls.zk = ZookeeperFixture.instance()
- cls.server = KafkaFixture.instance(0, cls.zk)
-
- @classmethod
- def tearDownClass(cls): # noqa
- if env_kafka_version() < (0, 11) or DISABLED:
- return
-
- cls.server.close()
- cls.zk.close()
-
- def setUp(self):
- if env_kafka_version() < (0, 11) or DISABLED:
- self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
- super(TestAdminClientIntegration, self).setUp()
-
- def tearDown(self):
- if env_kafka_version() < (0, 11) or DISABLED:
- return
- super(TestAdminClientIntegration, self).tearDown()
-
- def test_create_describe_delete_acls(self):
- """Tests that we can add, list and remove ACLs
- """
-
- # Setup
- brokers = '%s:%d' % (self.server.host, self.server.port)
- admin_client = KafkaAdminClient(
- bootstrap_servers=brokers
+ # Check that we don't have any ACLs in the cluster
+ acls, error = kafka_admin_client.describe_acls(
+ ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
-
- # Check that we don't have any ACLs in the cluster
- acls, error = admin_client.describe_acls(
+ )
+
+ assert error is NoError
+ assert len(acls) == 0
+
+ # Try to add an ACL
+ acl = ACL(
+ principal="User:test",
+ host="*",
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ result = kafka_admin_client.create_acls([acl])
+
+ assert len(result["failed"]) == 0
+ assert len(result["succeeded"]) == 1
+
+ # Check that we can list the ACL we created
+ acl_filter = ACLFilter(
+ principal=None,
+ host="*",
+ operation=ACLOperation.ANY,
+ permission_type=ACLPermissionType.ANY,
+ resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
+ )
+ acls, error = kafka_admin_client.describe_acls(acl_filter)
+
+ assert error is NoError
+ assert len(acls) == 1
+
+ # Remove the ACL
+ delete_results = kafka_admin_client.delete_acls(
+ [
ACLFilter(
- principal=None,
+ principal="User:test",
host="*",
- operation=ACLOperation.ANY,
- permission_type=ACLPermissionType.ANY,
+ operation=ACLOperation.READ,
+ permission_type=ACLPermissionType.ALLOW,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
- )
+ ]
+ )
- self.assertIs(error, NoError)
- self.assertEqual(0, len(acls))
+ assert len(delete_results) == 1
+ assert len(delete_results[0][1]) == 1 # Check number of affected ACLs
- # Try to add an ACL
- acl = ACL(
- principal="User:test",
- host="*",
- operation=ACLOperation.READ,
- permission_type=ACLPermissionType.ALLOW,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- result = admin_client.create_acls([acl])
-
- self.assertFalse(len(result["failed"]))
- self.assertEqual(len(result["succeeded"]), 1)
-
- # Check that we can list the ACL we created
- acl_filter = ACLFilter(
- principal=None,
+ # Make sure the ACL does not exist in the cluster anymore
+ acls, error = kafka_admin_client.describe_acls(
+ ACLFilter(
+ principal="*",
host="*",
operation=ACLOperation.ANY,
permission_type=ACLPermissionType.ANY,
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
)
- acls, error = admin_client.describe_acls(acl_filter)
-
- self.assertIs(error, NoError)
- self.assertEqual(1, len(acls))
-
- # Remove the ACL
- delete_results = admin_client.delete_acls(
- [
- ACLFilter(
- principal="User:test",
- host="*",
- operation=ACLOperation.READ,
- permission_type=ACLPermissionType.ALLOW,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- ]
- )
+ )
- self.assertEqual(1, len(delete_results))
- self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs
-
-
- # Make sure the ACL does not exist in the cluster anymore
- acls, error = admin_client.describe_acls(
- ACLFilter(
- principal="*",
- host="*",
- operation=ACLOperation.ANY,
- permission_type=ACLPermissionType.ANY,
- resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
- )
- )
- self.assertIs(error, NoError)
- self.assertEqual(0, len(acls))
+ assert error is NoError
+ assert len(acls) == 0