summaryrefslogtreecommitdiff
path: root/test/conftest.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/conftest.py')
-rw-r--r--test/conftest.py26
1 files changed, 26 insertions, 0 deletions
diff --git a/test/conftest.py b/test/conftest.py
index 5015cc7..267ac6a 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1,5 +1,7 @@
from __future__ import absolute_import
+import uuid
+
import pytest
from test.testutil import env_kafka_version, random_string
@@ -137,3 +139,27 @@ def conn(mocker):
conn.connected = lambda: conn.state is ConnectionStates.CONNECTED
conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED
return conn
+
+
+@pytest.fixture()
+def send_messages(topic, kafka_producer, request):
+ """A factory that returns a send_messages function with a pre-populated
+ topic topic / producer."""
+
+ def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request):
+ """
+ messages is typically `range(0,100)`
+ partition is an int
+ """
+ messages_and_futures = [] # [(message, produce_future),]
+ for i in number_range:
+ # request.node.name provides the test name (including parametrized values)
+ encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8')
+ future = kafka_producer.send(topic, value=encoded_msg, partition=partition)
+ messages_and_futures.append((encoded_msg, future))
+ kafka_producer.flush()
+ for (msg, f) in messages_and_futures:
+ assert f.succeeded()
+ return [msg for (msg, f) in messages_and_futures]
+
+ return _send_messages