summaryrefslogtreecommitdiff
path: root/test/conftest.py
diff options
context:
space:
mode:
authorAndre Araujo <asdaraujo@gmail.com>2017-11-15 06:08:29 -0800
committerJeff Widman <jeff@jeffwidman.com>2018-02-21 13:30:12 -0800
commita1869c4be5f47b4f6433610249aaf29af4ec95e5 (patch)
treec18b155f5a3b812ed69a2f3a7d0499628cd87694 /test/conftest.py
parent0f5d35fa3489fa36000c05a891d375cc30672e23 (diff)
downloadkafka-python-a1869c4be5f47b4f6433610249aaf29af4ec95e5.tar.gz
Introduce new fixtures to prepare for migration to pytest.
This commits adds new pytest fixtures in prepation for the migration of unittest.TestCases to pytest test cases. The handling of temporary dir creation was also changed so that we can use the pytest tmpdir fixture after the migration.
Diffstat (limited to 'test/conftest.py')
-rw-r--r--test/conftest.py113
1 files changed, 96 insertions, 17 deletions
diff --git a/test/conftest.py b/test/conftest.py
index e85b977..d53ff23 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,38 +1,117 @@
from __future__ import absolute_import
-import os
+import inspect
import pytest
+from decorator import decorate
from test.fixtures import KafkaFixture, ZookeeperFixture
-
+from test.testutil import kafka_version, random_string
@pytest.fixture(scope="module")
def version():
- if 'KAFKA_VERSION' not in os.environ:
- return ()
- return tuple(map(int, os.environ['KAFKA_VERSION'].split('.')))
-
+ """Return the Kafka version set in the OS environment"""
+ return kafka_version()
@pytest.fixture(scope="module")
-def zookeeper(version, request):
- assert version
- zk = ZookeeperFixture.instance()
- yield zk
- zk.close()
+def zookeeper():
+ """Return a Zookeeper fixture"""
+ zk_instance = ZookeeperFixture.instance()
+ yield zk_instance
+ zk_instance.close()
+@pytest.fixture(scope="module")
+def kafka_broker(kafka_broker_factory):
+ """Return a Kafka broker fixture"""
+ return kafka_broker_factory()[0]
@pytest.fixture(scope="module")
-def kafka_broker(version, zookeeper, request):
- assert version
- k = KafkaFixture.instance(0, zookeeper.host, zookeeper.port,
- partitions=4)
- yield k
- k.close()
+def kafka_broker_factory(version, zookeeper):
+ """Return a Kafka broker fixture factory"""
+ assert version, 'KAFKA_VERSION must be specified to run integration tests'
+
+ _brokers = []
+ def factory(**broker_params):
+ params = {} if broker_params is None else broker_params.copy()
+ params.setdefault('partitions', 4)
+ num_brokers = params.pop('num_brokers', 1)
+ brokers = tuple(KafkaFixture.instance(x, zookeeper, **params)
+ for x in range(num_brokers))
+ _brokers.extend(brokers)
+ return brokers
+ yield factory
+
+ for broker in _brokers:
+ broker.close()
+
+@pytest.fixture
+def simple_client(kafka_broker, request, topic):
+ """Return a SimpleClient fixture"""
+ client = kafka_broker.get_simple_client(client_id='%s_client' % (request.node.name,))
+ client.ensure_topic_exists(topic)
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_client(kafka_broker, request):
+ """Return a KafkaClient fixture"""
+ (client,) = kafka_broker.get_clients(cnt=1, client_id='%s_client' % (request.node.name,))
+ yield client
+ client.close()
+
+@pytest.fixture
+def kafka_consumer(kafka_consumer_factory):
+ """Return a KafkaConsumer fixture"""
+ return kafka_consumer_factory()
+
+@pytest.fixture
+def kafka_consumer_factory(kafka_broker, topic, request):
+ """Return a KafkaConsumer factory fixture"""
+ _consumer = [None]
+
+ def factory(**kafka_consumer_params):
+ params = {} if kafka_consumer_params is None else kafka_consumer_params.copy()
+ params.setdefault('client_id', 'consumer_%s' % (request.node.name,))
+ _consumer[0] = next(kafka_broker.get_consumers(cnt=1, topics=[topic], **params))
+ return _consumer[0]
+
+ yield factory
+
+ if _consumer[0]:
+ _consumer[0].close()
+
+@pytest.fixture
+def kafka_producer(kafka_producer_factory):
+ """Return a KafkaProducer fixture"""
+ yield kafka_producer_factory()
+
+@pytest.fixture
+def kafka_producer_factory(kafka_broker, request):
+ """Return a KafkaProduce factory fixture"""
+ _producer = [None]
+
+ def factory(**kafka_producer_params):
+ params = {} if kafka_producer_params is None else kafka_producer_params.copy()
+ params.setdefault('client_id', 'producer_%s' % (request.node.name,))
+ _producer[0] = next(kafka_broker.get_producers(cnt=1, **params))
+ return _producer[0]
+
+ yield factory
+
+ if _producer[0]:
+ _producer[0].close()
+
+@pytest.fixture
+def topic(kafka_broker, request):
+ """Return a topic fixture"""
+ topic_name = '%s_%s' % (request.node.name, random_string(10))
+ kafka_broker.create_topics([topic_name])
+ return topic_name
@pytest.fixture
def conn(mocker):
+ """Return a connection mocker fixture"""
from kafka.conn import ConnectionStates
from kafka.future import Future
from kafka.protocol.metadata import MetadataResponse