diff options
author | Dana Powers <dana.powers@rd.io> | 2015-12-05 03:36:12 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-12-05 03:36:12 -0800 |
commit | b687b4c5d4788d64efe9b7bcfb776e57d6fbcc8e (patch) | |
tree | d575411a9b1c95088d80012fad6717f2336cf78a | |
parent | 0ab97a99088c2871c3ab30dbc0dbf13f5f414433 (diff) | |
download | kafka-python-b687b4c5d4788d64efe9b7bcfb776e57d6fbcc8e.tar.gz |
Use producer.stop() to flush messages in async producer test
-rw-r--r-- | test/test_producer_integration.py | 6 |
1 files changed, 2 insertions, 4 deletions
diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 46b6851..c99ed63 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -204,13 +204,11 @@ class TestKafkaProducerIntegration(KafkaIntegrationTestCase): resp = producer.send_messages(self.topic, self.msg("one")) self.assertEqual(len(resp), 0) - # wait for the server to report a new highwatermark - while self.current_offset(self.topic, partition) == start_offset: - time.sleep(0.1) + # flush messages + producer.stop() self.assert_fetch_offset(partition, start_offset, [ self.msg("one") ]) - producer.stop() @kafka_versions("all") def test_batched_simple_producer__triggers_by_message(self): |