summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_producer.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/test/test_producer.py b/test/test_producer.py
index 8ef49b3..f11bb05 100644
--- a/test/test_producer.py
+++ b/test/test_producer.py
@@ -46,17 +46,21 @@ def test_end_to_end(kafka_broker, compression):
topic = random_string(5)
- for i in range(1000):
- producer.send(topic, 'msg %d' % i)
- producer.flush(timeout=30)
+ messages = 100
+ futures = []
+ for i in range(messages):
+ futures.append(producer.send(topic, 'msg %d' % i))
+ ret = [f.get(timeout=30) for f in futures]
+ assert len(ret) == messages
+
producer.close()
consumer.subscribe([topic])
msgs = set()
- for i in range(1000):
+ for i in range(messages):
try:
msgs.add(next(consumer).value)
except StopIteration:
break
- assert msgs == set(['msg %d' % i for i in range(1000)])
+ assert msgs == set(['msg %d' % i for i in range(messages)])