diff options
Diffstat (limited to 'test/test_sasl_integration.py')
-rw-r--r-- | test/test_sasl_integration.py | 80 |
1 files changed, 80 insertions, 0 deletions
diff --git a/test/test_sasl_integration.py b/test/test_sasl_integration.py new file mode 100644 index 0000000..e3a4813 --- /dev/null +++ b/test/test_sasl_integration.py @@ -0,0 +1,80 @@ +import logging +import uuid + +import pytest + +from kafka.admin import NewTopic +from kafka.protocol.metadata import MetadataRequest_v1 +from test.testutil import assert_message_count, env_kafka_version, random_string, special_to_underscore + + +@pytest.fixture( + params=[ + pytest.param( + "PLAIN", marks=pytest.mark.skipif(env_kafka_version() < (0, 10), reason="Requires KAFKA_VERSION >= 0.10") + ), + pytest.param( + "SCRAM-SHA-256", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + pytest.param( + "SCRAM-SHA-512", + marks=pytest.mark.skipif(env_kafka_version() < (0, 10, 2), reason="Requires KAFKA_VERSION >= 0.10.2"), + ), + ] +) +def sasl_kafka(request, kafka_broker_factory): + sasl_kafka = kafka_broker_factory(transport="SASL_PLAINTEXT", sasl_mechanism=request.param)[0] + yield sasl_kafka + sasl_kafka.child.dump_logs() + + +def test_admin(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + admin, = sasl_kafka.get_admin_clients(1) + admin.create_topics([NewTopic(topic_name, 1, 1)]) + assert topic_name in sasl_kafka.get_topic_names() + + +def test_produce_and_consume(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=2) + producer, = sasl_kafka.get_producers(1) + + messages_and_futures = [] # [(message, produce_future),] + for i in range(100): + encoded_msg = "{}-{}-{}".format(i, request.node.name, uuid.uuid4()).encode("utf-8") + future = producer.send(topic_name, value=encoded_msg, partition=i % 2) + messages_and_futures.append((encoded_msg, future)) + producer.flush() + + for (msg, f) in messages_and_futures: + assert f.succeeded() + + consumer, = sasl_kafka.get_consumers(1, [topic_name]) + messages = {0: [], 1: []} + for i, message in enumerate(consumer, 1): + logging.debug("Consumed message %s", repr(message)) + messages[message.partition].append(message) + if i >= 100: + break + + assert_message_count(messages[0], 50) + assert_message_count(messages[1], 50) + + +def test_client(request, sasl_kafka): + topic_name = special_to_underscore(request.node.name + random_string(4)) + sasl_kafka.create_topics([topic_name], num_partitions=1) + + client, = sasl_kafka.get_clients(1) + request = MetadataRequest_v1(None) + client.send(0, request) + for _ in range(10): + result = client.poll(timeout_ms=10000) + if len(result) > 0: + break + else: + raise RuntimeError("Couldn't fetch topic response from Broker.") + result = result[0] + assert topic_name in [t[1] for t in result.topics] |