import pytest import kafka.admin from kafka.errors import IllegalArgumentError def test_config_resource(): with pytest.raises(KeyError): bad_resource = kafka.admin.ConfigResource('something', 'foo') good_resource = kafka.admin.ConfigResource('broker', 'bar') assert(good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER) assert(good_resource.name == 'bar') assert(good_resource.configs is None) good_resource = kafka.admin.ConfigResource(kafka.admin.ConfigResourceType.TOPIC, 'baz', {'frob' : 'nob'}) assert(good_resource.resource_type == kafka.admin.ConfigResourceType.TOPIC) assert(good_resource.name == 'baz') assert(good_resource.configs == {'frob' : 'nob'}) def test_new_partitions(): good_partitions = kafka.admin.NewPartitions(6) assert(good_partitions.total_count == 6) assert(good_partitions.new_assignments is None) good_partitions = kafka.admin.NewPartitions(7, [[1, 2, 3]]) assert(good_partitions.total_count == 7) assert(good_partitions.new_assignments == [[1, 2, 3]]) def test_new_topic(): with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', -1, -1) with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', 1, -1) with pytest.raises(IllegalArgumentError): bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1 : [1, 1, 1]}) good_topic = kafka.admin.NewTopic('foo', 1, 2) assert(good_topic.name == 'foo') assert(good_topic.num_partitions == 1) assert(good_topic.replication_factor == 2) assert(good_topic.replica_assignments == {}) assert(good_topic.topic_configs == {}) good_topic = kafka.admin.NewTopic('bar', -1, -1, {1 : [1, 2, 3]}, {'key' : 'value'}) assert(good_topic.name == 'bar') assert(good_topic.num_partitions == -1) assert(good_topic.replication_factor == -1) assert(good_topic.replica_assignments == {1: [1, 2, 3]}) assert(good_topic.topic_configs == {'key' : 'value'})