diff options
Diffstat (limited to 'test/test_admin_integration.py')
-rw-r--r-- | test/test_admin_integration.py | 164 |
1 files changed, 62 insertions, 102 deletions
diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2672faa..3efa021 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,122 +1,82 @@ import pytest -import os -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset +from test.testutil import env_kafka_version from kafka.errors import NoError -from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL -# This test suite passes for me locally, but fails on travis -# Needs investigation -DISABLED = True -# TODO: Convert to pytest / fixtures -# Note that ACL features require broker 0.11, but other admin apis may work on -# earlier broker versions -class TestAdminClientIntegration(KafkaIntegrationTestCase): - @classmethod - def setUpClass(cls): # noqa - if env_kafka_version() < (0, 11) or DISABLED: - return +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") +def test_create_describe_delete_acls(kafka_admin_client): + """Tests that we can add, list and remove ACLs + """ - cls.zk = ZookeeperFixture.instance() - cls.server = KafkaFixture.instance(0, cls.zk) - - @classmethod - def tearDownClass(cls): # noqa - if env_kafka_version() < (0, 11) or DISABLED: - return - - cls.server.close() - cls.zk.close() - - def setUp(self): - if env_kafka_version() < (0, 11) or DISABLED: - self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11') - super(TestAdminClientIntegration, self).setUp() - - def tearDown(self): - if env_kafka_version() < (0, 11) or DISABLED: - return - super(TestAdminClientIntegration, self).tearDown() - - def test_create_describe_delete_acls(self): - """Tests that we can add, list and remove ACLs - """ - - # Setup - brokers = '%s:%d' % (self.server.host, self.server.port) - admin_client = KafkaAdminClient( - bootstrap_servers=brokers + # Check that we don't have any ACLs in the cluster + acls, error = kafka_admin_client.describe_acls( + ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - - # Check that we don't have any ACLs in the cluster - acls, error = admin_client.describe_acls( + ) + + assert error is NoError + assert len(acls) == 0 + + # Try to add an ACL + acl = ACL( + principal="User:test", + host="*", + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + result = kafka_admin_client.create_acls([acl]) + + assert len(result["failed"]) == 0 + assert len(result["succeeded"]) == 1 + + # Check that we can list the ACL we created + acl_filter = ACLFilter( + principal=None, + host="*", + operation=ACLOperation.ANY, + permission_type=ACLPermissionType.ANY, + resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") + ) + acls, error = kafka_admin_client.describe_acls(acl_filter) + + assert error is NoError + assert len(acls) == 1 + + # Remove the ACL + delete_results = kafka_admin_client.delete_acls( + [ ACLFilter( - principal=None, + principal="User:test", host="*", - operation=ACLOperation.ANY, - permission_type=ACLPermissionType.ANY, + operation=ACLOperation.READ, + permission_type=ACLPermissionType.ALLOW, resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - ) + ] + ) - self.assertIs(error, NoError) - self.assertEqual(0, len(acls)) + assert len(delete_results) == 1 + assert len(delete_results[0][1]) == 1 # Check number of affected ACLs - # Try to add an ACL - acl = ACL( - principal="User:test", - host="*", - operation=ACLOperation.READ, - permission_type=ACLPermissionType.ALLOW, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - result = admin_client.create_acls([acl]) - - self.assertFalse(len(result["failed"])) - self.assertEqual(len(result["succeeded"]), 1) - - # Check that we can list the ACL we created - acl_filter = ACLFilter( - principal=None, + # Make sure the ACL does not exist in the cluster anymore + acls, error = kafka_admin_client.describe_acls( + ACLFilter( + principal="*", host="*", operation=ACLOperation.ANY, permission_type=ACLPermissionType.ANY, resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") ) - acls, error = admin_client.describe_acls(acl_filter) - - self.assertIs(error, NoError) - self.assertEqual(1, len(acls)) - - # Remove the ACL - delete_results = admin_client.delete_acls( - [ - ACLFilter( - principal="User:test", - host="*", - operation=ACLOperation.READ, - permission_type=ACLPermissionType.ALLOW, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - ] - ) + ) - self.assertEqual(1, len(delete_results)) - self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs - - - # Make sure the ACL does not exist in the cluster anymore - acls, error = admin_client.describe_acls( - ACLFilter( - principal="*", - host="*", - operation=ACLOperation.ANY, - permission_type=ACLPermissionType.ANY, - resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") - ) - ) - self.assertIs(error, NoError) - self.assertEqual(0, len(acls)) + assert error is NoError + assert len(acls) == 0 |