summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_producer.py34
1 files changed, 34 insertions, 0 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
new file mode 100644
index 0000000..b84feb4
--- /dev/null
+++ b/test/test_producer.py
@@ -0,0 +1,34 @@
+import pytest
+
+from kafka import KafkaConsumer, KafkaProducer
+from test.conftest import version
+from test.testutil import random_string
+
+
+@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
+def test_end_to_end(kafka_broker):
+ connect_str = 'localhost:' + str(kafka_broker.port)
+ producer = KafkaProducer(bootstrap_servers=connect_str,
+ max_block_ms=10000,
+ value_serializer=str.encode)
+ consumer = KafkaConsumer(bootstrap_servers=connect_str,
+ consumer_timeout_ms=10000,
+ auto_offset_reset='earliest',
+ value_deserializer=bytes.decode)
+
+ topic = random_string(5)
+
+ for i in range(1000):
+ producer.send(topic, 'msg %d' % i)
+ producer.flush()
+ producer.close()
+
+ consumer.subscribe([topic])
+ msgs = set()
+ for i in range(1000):
+ try:
+ msgs.add(next(consumer).value)
+ except StopIteration:
+ break
+
+ assert msgs == set(['msg %d' % i for i in range(1000)])