summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-23 22:47:11 -0800
committerDana Powers <dana.powers@rd.io>2016-01-24 17:33:09 -0800
commitf51623142dfc089aeb46e986b1d0382f3fab3025 (patch)
tree7f67c4506235afdecb43c1a779cb3b4ae879995f
parent4d993bbe12fc20a6469a1e3074a74eabd2aba114 (diff)
downloadkafka-python-f51623142dfc089aeb46e986b1d0382f3fab3025.tar.gz
Add simple KafkaProducer -> KafkaConsumer integration test
-rw-r--r--test/test_producer.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
new file mode 100644
index 0000000..b84feb4
--- /dev/null
+++ b/test/test_producer.py
@@ -0,0 +1,34 @@
+import pytest
+
+from kafka import KafkaConsumer, KafkaProducer
+from test.conftest import version
+from test.testutil import random_string
+
+
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_end_to_end(kafka_broker):
+ connect_str = 'localhost:' + str(kafka_broker.port)
+ producer = KafkaProducer(bootstrap_servers=connect_str,
+ max_block_ms=10000,
+ value_serializer=str.encode)
+ consumer = KafkaConsumer(bootstrap_servers=connect_str,
+ consumer_timeout_ms=10000,
+ auto_offset_reset='earliest',
+ value_deserializer=bytes.decode)
+
+ topic = random_string(5)
+
+ for i in range(1000):
+ producer.send(topic, 'msg %d' % i)
+ producer.flush()
+ producer.close()
+
+ consumer.subscribe([topic])
+ msgs = set()
+ for i in range(1000):
+ try:
+ msgs.add(next(consumer).value)
+ except StopIteration:
+ break
+
+ assert msgs == set(['msg %d' % i for i in range(1000)])