diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-23 22:47:11 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-24 17:33:09 -0800 |
commit | f51623142dfc089aeb46e986b1d0382f3fab3025 (patch) | |
tree | 7f67c4506235afdecb43c1a779cb3b4ae879995f | |
parent | 4d993bbe12fc20a6469a1e3074a74eabd2aba114 (diff) | |
download | kafka-python-f51623142dfc089aeb46e986b1d0382f3fab3025.tar.gz |
Add simple KafkaProducer -> KafkaConsumer integration test
-rw-r--r-- | test/test_producer.py | 34 |
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_producer.py b/test/test_producer.py new file mode 100644 index 0000000..b84feb4 --- /dev/null +++ b/test/test_producer.py @@ -0,0 +1,34 @@ +import pytest + +from kafka import KafkaConsumer, KafkaProducer +from test.conftest import version +from test.testutil import random_string + + +@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set") +def test_end_to_end(kafka_broker): + connect_str = 'localhost:' + str(kafka_broker.port) + producer = KafkaProducer(bootstrap_servers=connect_str, + max_block_ms=10000, + value_serializer=str.encode) + consumer = KafkaConsumer(bootstrap_servers=connect_str, + consumer_timeout_ms=10000, + auto_offset_reset='earliest', + value_deserializer=bytes.decode) + + topic = random_string(5) + + for i in range(1000): + producer.send(topic, 'msg %d' % i) + producer.flush() + producer.close() + + consumer.subscribe([topic]) + msgs = set() + for i in range(1000): + try: + msgs.add(next(consumer).value) + except StopIteration: + break + + assert msgs == set(['msg %d' % i for i in range(1000)]) |