diff options
author | Dana Powers <dana.powers@rd.io> | 2015-06-08 23:12:32 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2015-06-08 23:27:09 -0700 |
commit | ddb536d87e7c6514d33a8b783cd955af05ed9b2f (patch) | |
tree | c6400cb9c13ea070fb2db2030d3a5026c641e6fb /test/test_consumer_integration.py | |
parent | 432590550d745eb1eda49ac12d2f8a3dca01111d (diff) | |
download | kafka-python-ddb536d87e7c6514d33a8b783cd955af05ed9b2f.tar.gz |
Reduce blocking times in consumer integration tests
Diffstat (limited to 'test/test_consumer_integration.py')
-rw-r--r-- | test/test_consumer_integration.py | 28 |
1 files changed, 14 insertions, 14 deletions
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c202c5c..8911e3e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_simple_consumer_blocking(self): consumer = self.consumer() - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) self.send_messages(0, range(0, 10)) @@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) - # Ask for 5 messages, No messages in queue, block 5 seconds + # Ask for 5 messages, No messages in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) # Send 10 messages self.send_messages(0, range(0, 10)) @@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, 5 in queue, block 5 seconds + # Ask for 10 messages, 5 in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 4.95) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer_timeout_ms=TIMEOUT_MS) - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: with self.assertRaises(ConsumerTimeout): msg = consumer.next() @@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: with self.assertRaises(ConsumerTimeout): |