diff options
author | Andre Araujo <asdaraujo@gmail.com> | 2017-11-15 06:08:29 -0800 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2018-02-21 13:30:12 -0800 |
commit | a1869c4be5f47b4f6433610249aaf29af4ec95e5 (patch) | |
tree | c18b155f5a3b812ed69a2f3a7d0499628cd87694 /test/conftest.py | |
parent | 0f5d35fa3489fa36000c05a891d375cc30672e23 (diff) | |
download | kafka-python-a1869c4be5f47b4f6433610249aaf29af4ec95e5.tar.gz |
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the
migration of unittest.TestCases to pytest test cases. The handling
of temporary dir creation was also changed so that we can use
the pytest tmpdir fixture after the migration.
Diffstat (limited to 'test/conftest.py')
-rw-r--r-- | test/conftest.py | 113 |
1 files changed, 96 insertions, 17 deletions
diff --git a/test/conftest.py b/test/conftest.py index e85b977..d53ff23 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,38 +1,117 @@ from __future__ import absolute_import -import os +import inspect import pytest +from decorator import decorate from test.fixtures import KafkaFixture, ZookeeperFixture - +from test.testutil import kafka_version, random_string @pytest.fixture(scope="module") def version(): - if 'KAFKA_VERSION' not in os.environ: - return () - return tuple(map(int, os.environ['KAFKA_VERSION'].split('.'))) - + """Return the Kafka version set in the OS environment""" + return kafka_version() @pytest.fixture(scope="module") -def zookeeper(version, request): - assert version - zk = ZookeeperFixture.instance() - yield zk - zk.close() +def zookeeper(): + """Return a Zookeeper fixture""" + zk_instance = ZookeeperFixture.instance() + yield zk_instance + zk_instance.close() +@pytest.fixture(scope="module") +def kafka_broker(kafka_broker_factory): + """Return a Kafka broker fixture""" + return kafka_broker_factory()[0] @pytest.fixture(scope="module") -def kafka_broker(version, zookeeper, request): - assert version - k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port, - partitions=4) - yield k - k.close() +def kafka_broker_factory(version, zookeeper): + """Return a Kafka broker fixture factory""" + assert version, 'KAFKA_VERSION must be specified to run integration tests' + + _brokers = [] + def factory(**broker_params): + params = {} if broker_params is None else broker_params.copy() + params.setdefault('partitions', 4) + num_brokers = params.pop('num_brokers', 1) + brokers = tuple(KafkaFixture.instance(x, zookeeper, **params) + for x in range(num_brokers)) + _brokers.extend(brokers) + return brokers + yield factory + + for broker in _brokers: + broker.close() + +@pytest.fixture +def simple_client(kafka_broker, request, topic): + """Return a SimpleClient fixture""" + client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,)) + client.ensure_topic_exists(topic) + yield client + client.close() + +@pytest.fixture +def kafka_client(kafka_broker, request): + """Return a KafkaClient fixture""" + (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,)) + yield client + client.close() + +@pytest.fixture +def kafka_consumer(kafka_consumer_factory): + """Return a KafkaConsumer fixture""" + return kafka_consumer_factory() + +@pytest.fixture +def kafka_consumer_factory(kafka_broker, topic, request): + """Return a KafkaConsumer factory fixture""" + _consumer = [None] + + def factory(**kafka_consumer_params): + params = {} if kafka_consumer_params is None else kafka_consumer_params.copy() + params.setdefault('client_id', 'consumer_%s' % (request.node.name,)) + _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params)) + return _consumer[0] + + yield factory + + if _consumer[0]: + _consumer[0].close() + +@pytest.fixture +def kafka_producer(kafka_producer_factory): + """Return a KafkaProducer fixture""" + yield kafka_producer_factory() + +@pytest.fixture +def kafka_producer_factory(kafka_broker, request): + """Return a KafkaProduce factory fixture""" + _producer = [None] + + def factory(**kafka_producer_params): + params = {} if kafka_producer_params is None else kafka_producer_params.copy() + params.setdefault('client_id', 'producer_%s' % (request.node.name,)) + _producer[0] = next(kafka_broker.get_producers(cnt=1, **params)) + return _producer[0] + + yield factory + + if _producer[0]: + _producer[0].close() + +@pytest.fixture +def topic(kafka_broker, request): + """Return a topic fixture""" + topic_name = '%s_%s' % (request.node.name, random_string(10)) + kafka_broker.create_topics([topic_name]) + return topic_name @pytest.fixture def conn(mocker): + """Return a connection mocker fixture""" from kafka.conn import ConnectionStates from kafka.future import Future from kafka.protocol.metadata import MetadataResponse |