From 432590550d745eb1eda49ac12d2f8a3dca01111d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 8 Jun 2015 23:05:55 -0700 Subject: Reduce log chatter in tests (only debug kafka.conn in test_conn) --- test/test_conn.py | 5 +++++ test/testutil.py | 5 +++++ 2 files changed, 10 insertions(+) (limited to 'test') diff --git a/test/test_conn.py b/test/test_conn.py index c4f219b..6e47cc8 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,3 +1,4 @@ +import logging import socket import struct from threading import Thread @@ -10,6 +11,10 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE class ConnTest(unittest.TestCase): def setUp(self): + + # kafka.conn debug logging is verbose, so only enable in conn tests + logging.getLogger('kafka.conn').setLevel(logging.DEBUG) + self.config = { 'host': 'localhost', 'port': 9090, diff --git a/test/testutil.py b/test/testutil.py index 99d8d01..3a1d2ba 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -113,3 +113,8 @@ class Timer(object): self.interval = self.end - self.start logging.basicConfig(level=logging.DEBUG) +logging.getLogger('test.fixtures').setLevel(logging.ERROR) +logging.getLogger('test.service').setLevel(logging.ERROR) + +# kafka.conn debug logging is verbose, disable in tests by default +logging.getLogger('kafka.conn').setLevel(logging.INFO) -- cgit v1.2.1 From ddb536d87e7c6514d33a8b783cd955af05ed9b2f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 8 Jun 2015 23:12:32 -0700 Subject: Reduce blocking times in consumer integration tests --- test/test_consumer_integration.py | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) (limited to 'test') diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c202c5c..8911e3e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_simple_consumer_blocking(self): consumer = self.consumer() - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) self.send_messages(0, range(0, 10)) @@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) - # Ask for 5 messages, No messages in queue, block 5 seconds + # Ask for 5 messages, No messages in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) # Send 10 messages self.send_messages(0, range(0, 10)) @@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, 5 in queue, block 5 seconds + # Ask for 10 messages, 5 in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 4.95) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer_timeout_ms=TIMEOUT_MS) - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: with self.assertRaises(ConsumerTimeout): msg = consumer.next() @@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: with self.assertRaises(ConsumerTimeout): -- cgit v1.2.1 From 53d8251a18d9c033269e105854a7c4cc9730930a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 8 Jun 2015 23:13:46 -0700 Subject: Produce messages to both partitions in async producer leader switch test --- test/test_failover_integration.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 5082d7c..91e22cf 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase): # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + async_log_messages_on_error=False) # Send 10 random messages self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # kill leader for partition self._kill_leader(topic, partition) @@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase): # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') + producer.send_messages(topic, partition + 1, b'success') # send to new leader self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # Stop the producer and wait for it to shutdown producer.stop() @@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,), at_least=True) + self.assert_message_count(topic, 21, partitions=(partition + 1,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): -- cgit v1.2.1 From 0e416d528767a1246c72ae1efa917e401cedd79f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 8 Jun 2015 23:26:51 -0700 Subject: Dont try to terminate a child that has already exited in test/service _despawn --- test/service.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'test') diff --git a/test/service.py b/test/service.py index 9368b85..b986a71 100644 --- a/test/service.py +++ b/test/service.py @@ -59,7 +59,8 @@ class SpawnedService(threading.Thread): self.alive = True def _despawn(self): - self.child.terminate() + if self.child.poll() is None: + self.child.terminate() self.alive = False for _ in range(50): if self.child.poll() is not None: -- cgit v1.2.1