summaryrefslogtreecommitdiff
path: root/test/conftest.py
diff options
context:
space:
mode:
authorJeff Widman <jeff@jeffwidman.com>2019-08-22 21:14:37 -0700
committerGitHub <noreply@github.com>2019-08-22 21:14:37 -0700
commit61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (patch)
tree7e9ade5bae3b4b3172b39771e08eb6b7108092c8 /test/conftest.py
parent6e6d0cca5dbdf0a9ae3a032b6de08f9bbbf9606a (diff)
downloadkafka-python-61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa.tar.gz
Convert remaining `KafkaConsumer` tests to `pytest` (#1886)
This makes it so the only remaining use of `unittest` is in the old tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are migrated to `pytest`. I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See https://github.com/dpkp/kafka-python/pull/1886#discussion_r316860737 for details.
Diffstat (limited to 'test/conftest.py')
-rw-r--r--test/conftest.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 5015cc7..267ac6a 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+import uuid
+
import pytest
from test.testutil import env_kafka_version, random_string
@@ -137,3 +139,27 @@ def conn(mocker):
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
+
+
+@pytest.fixture()
+def send_messages(topic, kafka_producer, request):
+ """A factory that returns a send_messages function with a pre-populated
+ topic topic / producer."""
+
+ def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
+ """
+ messages is typically `range(0,100)`
+ partition is an int
+ """
+ messages_and_futures = [] # [(message, produce_future),]
+ for i in number_range:
+ # request.node.name provides the test name (including parametrized values)
+ encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
+ future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
+ messages_and_futures.append((encoded_msg, future))
+ kafka_producer.flush()
+ for (msg, f) in messages_and_futures:
+ assert f.succeeded()
+ return [msg for (msg, f) in messages_and_futures]
+
+ return _send_messages