diff options
| author | Ulrik Johansson <ulrik.johansson@gmail.com> | 2019-10-07 20:11:58 +0200 | 
|---|---|---|
| committer | Jeff Widman <jeff@jeffwidman.com> | 2019-10-07 11:11:58 -0700 | 
| commit | 84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1 (patch) | |
| tree | f456e1b611ab667f7a8c8209eee0ab1bc283de51 /test | |
| parent | f1cda98e0b427116d5eb901bce2d697b3f037e78 (diff) | |
| download | kafka-python-84e37e0f14b53fbf6fdc2ad97ea1625e50a149d1.tar.gz | |
convert test_admin_integration to pytest (#1923)
Diffstat (limited to 'test')
| -rw-r--r-- | test/conftest.py | 19 | ||||
| -rw-r--r-- | test/fixtures.py | 10 | ||||
| -rw-r--r-- | test/test_admin_integration.py | 164 | 
3 files changed, 90 insertions, 103 deletions
diff --git a/test/conftest.py b/test/conftest.py index 267ac6a..bbe4048 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -105,6 +105,25 @@ def kafka_producer_factory(kafka_broker, request):      if _producer[0]:          _producer[0].close() +@pytest.fixture +def kafka_admin_client(kafka_admin_client_factory): +    """Return a KafkaAdminClient fixture""" +    yield kafka_admin_client_factory() + +@pytest.fixture +def kafka_admin_client_factory(kafka_broker): +    """Return a KafkaAdminClient factory fixture""" +    _admin_client = [None] + +    def factory(**kafka_admin_client_params): +        params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy() +        _admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params)) +        return _admin_client[0] + +    yield factory + +    if _admin_client[0]: +        _admin_client[0].close()  @pytest.fixture  def topic(kafka_broker, request): diff --git a/test/fixtures.py b/test/fixtures.py index c7748f1..68572b5 100644 --- a/test/fixtures.py +++ b/test/fixtures.py @@ -13,7 +13,7 @@ import py  from kafka.vendor.six.moves import urllib, range  from kafka.vendor.six.moves.urllib.parse import urlparse  # pylint: disable=E0611,F0401 -from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient +from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient  from kafka.client_async import KafkaClient  from kafka.protocol.admin import CreateTopicsRequest  from kafka.protocol.metadata import MetadataRequest @@ -500,6 +500,14 @@ class KafkaFixture(Fixture):          return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),                                   bootstrap_servers=self.bootstrap_server()) for x in range(cnt)) +    def get_admin_clients(self, cnt=1, **params): +        params.setdefault('client_id', 'admin_client') +        params['bootstrap_servers'] = self.bootstrap_server() +        client_id = params['client_id'] +        for x in range(cnt): +            params['client_id'] = '%s_%s' % (client_id, random_string(4)) +            yield KafkaAdminClient(**params) +      def get_consumers(self, cnt, topics, **params):          params.setdefault('client_id', 'consumer')          params.setdefault('heartbeat_interval_ms', 500) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 2672faa..3efa021 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -1,122 +1,82 @@  import pytest -import os -from test.fixtures import ZookeeperFixture, KafkaFixture -from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset +from test.testutil import env_kafka_version  from kafka.errors import NoError -from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL +from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL -# This test suite passes for me locally, but fails on travis -# Needs investigation -DISABLED = True -# TODO: Convert to pytest / fixtures -# Note that ACL features require broker 0.11, but other admin apis may work on -# earlier broker versions -class TestAdminClientIntegration(KafkaIntegrationTestCase): -    @classmethod -    def setUpClass(cls):  # noqa -        if env_kafka_version() < (0, 11) or DISABLED: -            return +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") +def test_create_describe_delete_acls(kafka_admin_client): +    """Tests that we can add, list and remove ACLs +    """ -        cls.zk = ZookeeperFixture.instance() -        cls.server = KafkaFixture.instance(0, cls.zk) - -    @classmethod -    def tearDownClass(cls):  # noqa -        if env_kafka_version() < (0, 11) or DISABLED: -            return - -        cls.server.close() -        cls.zk.close() - -    def setUp(self): -        if env_kafka_version() < (0, 11) or DISABLED: -            self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11') -        super(TestAdminClientIntegration, self).setUp() - -    def tearDown(self): -        if env_kafka_version() < (0, 11) or DISABLED: -            return -        super(TestAdminClientIntegration, self).tearDown() - -    def test_create_describe_delete_acls(self): -        """Tests that we can add, list and remove ACLs -        """ - -        # Setup -        brokers = '%s:%d' % (self.server.host, self.server.port) -        admin_client = KafkaAdminClient( -            bootstrap_servers=brokers +    # Check that we don't have any ACLs in the cluster +    acls, error = kafka_admin_client.describe_acls( +        ACLFilter( +            principal=None, +            host="*", +            operation=ACLOperation.ANY, +            permission_type=ACLPermissionType.ANY, +            resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")          ) - -        # Check that we don't have any ACLs in the cluster -        acls, error = admin_client.describe_acls( +    ) + +    assert error is NoError +    assert len(acls) == 0 + +    # Try to add an ACL +    acl = ACL( +        principal="User:test", +        host="*", +        operation=ACLOperation.READ, +        permission_type=ACLPermissionType.ALLOW, +        resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") +    ) +    result = kafka_admin_client.create_acls([acl]) + +    assert len(result["failed"]) == 0 +    assert len(result["succeeded"]) == 1 + +    # Check that we can list the ACL we created +    acl_filter = ACLFilter( +        principal=None, +        host="*", +        operation=ACLOperation.ANY, +        permission_type=ACLPermissionType.ANY, +        resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") +    ) +    acls, error = kafka_admin_client.describe_acls(acl_filter) + +    assert error is NoError +    assert len(acls) == 1 + +    # Remove the ACL +    delete_results = kafka_admin_client.delete_acls( +        [              ACLFilter( -                principal=None, +                principal="User:test",                  host="*", -                operation=ACLOperation.ANY, -                permission_type=ACLPermissionType.ANY, +                operation=ACLOperation.READ, +                permission_type=ACLPermissionType.ALLOW,                  resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")              ) -        ) +        ] +    ) -        self.assertIs(error, NoError) -        self.assertEqual(0, len(acls)) +    assert len(delete_results) == 1 +    assert len(delete_results[0][1]) == 1  # Check number of affected ACLs -        # Try to add an ACL -        acl = ACL( -            principal="User:test", -            host="*", -            operation=ACLOperation.READ, -            permission_type=ACLPermissionType.ALLOW, -            resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") -        ) -        result = admin_client.create_acls([acl]) - -        self.assertFalse(len(result["failed"])) -        self.assertEqual(len(result["succeeded"]), 1) - -        # Check that we can list the ACL we created -        acl_filter = ACLFilter( -            principal=None, +    # Make sure the ACL does not exist in the cluster anymore +    acls, error = kafka_admin_client.describe_acls( +        ACLFilter( +            principal="*",              host="*",              operation=ACLOperation.ANY,              permission_type=ACLPermissionType.ANY,              resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")          ) -        acls, error = admin_client.describe_acls(acl_filter) - -        self.assertIs(error, NoError) -        self.assertEqual(1, len(acls)) - -        # Remove the ACL -        delete_results = admin_client.delete_acls( -            [ -                ACLFilter( -                    principal="User:test", -                    host="*", -                    operation=ACLOperation.READ, -                    permission_type=ACLPermissionType.ALLOW, -                    resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") -                ) -            ] -        ) +    ) -        self.assertEqual(1, len(delete_results)) -        self.assertEqual(1, len(delete_results[0][1]))  # Check number of affected ACLs - - -        # Make sure the ACL does not exist in the cluster anymore -        acls, error = admin_client.describe_acls( -            ACLFilter( -                principal="*", -                host="*", -                operation=ACLOperation.ANY, -                permission_type=ACLPermissionType.ANY, -                resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic") -            ) -        ) -        self.assertIs(error, NoError) -        self.assertEqual(0, len(acls)) +    assert error is NoError +    assert len(acls) == 0  | 
