diff options
author | Jeff Widman <jeff@jeffwidman.com> | 2019-08-22 21:14:37 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-08-22 21:14:37 -0700 |
commit | 61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa (patch) | |
tree | 7e9ade5bae3b4b3172b39771e08eb6b7108092c8 /test/conftest.py | |
parent | 6e6d0cca5dbdf0a9ae3a032b6de08f9bbbf9606a (diff) | |
download | kafka-python-61fa0b27685c2d4e67d1b6575ca6797f36eb1bfa.tar.gz |
Convert remaining `KafkaConsumer` tests to `pytest` (#1886)
This makes it so the only remaining use of `unittest` is in the old
tests of the deprecated `Simple*` clients. All `KafkaConsumer` tests are
migrated to `pytest`.
I also had to bump the test iterations up on one of the tests, I think there was a race condition there that was more commonly hit under pytest , planning to cleanup that in a followup PR. See https://github.com/dpkp/kafka-python/pull/1886#discussion_r316860737 for details.
Diffstat (limited to 'test/conftest.py')
-rw-r--r-- | test/conftest.py | 26 |
1 files changed, 26 insertions, 0 deletions
diff --git a/test/conftest.py b/test/conftest.py index 5015cc7..267ac6a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import uuid + import pytest from test.testutil import env_kafka_version, random_string @@ -137,3 +139,27 @@ def conn(mocker): conn.connected = lambda: conn.state is ConnectionStates.CONNECTED conn.disconnected = lambda: conn.state is ConnectionStates.DISCONNECTED return conn + + +@pytest.fixture() +def send_messages(topic, kafka_producer, request): + """A factory that returns a send_messages function with a pre-populated + topic topic / producer.""" + + def _send_messages(number_range, partition=0, topic=topic, producer=kafka_producer, request=request): + """ + messages is typically `range(0,100)` + partition is an int + """ + messages_and_futures = [] # [(message, produce_future),] + for i in number_range: + # request.node.name provides the test name (including parametrized values) + encoded_msg = '{}-{}-{}'.format(i, request.node.name, uuid.uuid4()).encode('utf-8') + future = kafka_producer.send(topic, value=encoded_msg, partition=partition) + messages_and_futures.append((encoded_msg, future)) + kafka_producer.flush() + for (msg, f) in messages_and_futures: + assert f.succeeded() + return [msg for (msg, f) in messages_and_futures] + + return _send_messages |