diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/service.py | 3 | ||||
-rw-r--r-- | test/test_conn.py | 5 | ||||
-rw-r--r-- | test/test_consumer_integration.py | 28 | ||||
-rw-r--r-- | test/test_failover_integration.py | 10 | ||||
-rw-r--r-- | test/testutil.py | 5 |
5 files changed, 35 insertions, 16 deletions
diff --git a/test/service.py b/test/service.py index 9368b85..b986a71 100644 --- a/test/service.py +++ b/test/service.py @@ -59,7 +59,8 @@ class SpawnedService(threading.Thread): self.alive = True def _despawn(self): - self.child.terminate() + if self.child.poll() is None: + self.child.terminate() self.alive = False for _ in range(50): if self.child.poll() is not None: diff --git a/test/test_conn.py b/test/test_conn.py index c4f219b..6e47cc8 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,3 +1,4 @@ +import logging import socket import struct from threading import Thread @@ -10,6 +11,10 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE class ConnTest(unittest.TestCase): def setUp(self): + + # kafka.conn debug logging is verbose, so only enable in conn tests + logging.getLogger('kafka.conn').setLevel(logging.DEBUG) + self.config = { 'host': 'localhost', 'port': 9090, diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index c202c5c..8911e3e 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_simple_consumer_blocking(self): consumer = self.consumer() - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) self.send_messages(0, range(0, 10)) @@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): def test_multi_process_consumer_blocking(self): consumer = self.consumer(consumer = MultiProcessConsumer) - # Ask for 5 messages, No messages in queue, block 5 seconds + # Ask for 5 messages, No messages in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(block=True, timeout=5) + messages = consumer.get_messages(block=True, timeout=1) self.assert_message_count(messages, 0) - self.assertGreaterEqual(t.interval, 5) + self.assertGreaterEqual(t.interval, 1) # Send 10 messages self.send_messages(0, range(0, 10)) @@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assert_message_count(messages, 5) self.assertLessEqual(t.interval, 1) - # Ask for 10 messages, 5 in queue, block 5 seconds + # Ask for 10 messages, 5 in queue, block 1 second with Timer() as t: - messages = consumer.get_messages(count=10, block=True, timeout=5) + messages = consumer.get_messages(count=10, block=True, timeout=1) self.assert_message_count(messages, 5) - self.assertGreaterEqual(t.interval, 4.95) + self.assertGreaterEqual(t.interval, 1) consumer.stop() @@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): consumer = self.kafka_consumer(auto_offset_reset='smallest', consumer_timeout_ms=TIMEOUT_MS) - # Ask for 5 messages, nothing in queue, block 5 seconds + # Ask for 5 messages, nothing in queue, block 500ms with Timer() as t: with self.assertRaises(ConsumerTimeout): msg = consumer.next() @@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase): self.assertEqual(len(messages), 5) self.assertLess(t.interval, TIMEOUT_MS / 1000.0 ) - # Ask for 10 messages, get 5 back, block 5 seconds + # Ask for 10 messages, get 5 back, block 500ms messages = set() with Timer() as t: with self.assertRaises(ConsumerTimeout): diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py index 5082d7c..91e22cf 100644 --- a/test/test_failover_integration.py +++ b/test/test_failover_integration.py @@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase): # Test the base class Producer -- send_messages to a specific partition producer = Producer(self.client, async=True, - req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT) + batch_send_every_n=15, + batch_send_every_t=3, + req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT, + async_log_messages_on_error=False) # Send 10 random messages self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # kill leader for partition self._kill_leader(topic, partition) @@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase): # in async mode, this should return immediately producer.send_messages(topic, partition, b'success') + producer.send_messages(topic, partition + 1, b'success') # send to new leader self._send_random_messages(producer, topic, partition, 10) + self._send_random_messages(producer, topic, partition + 1, 10) # Stop the producer and wait for it to shutdown producer.stop() @@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase): # Should be equal to 10 before + 1 recovery + 10 after self.assert_message_count(topic, 21, partitions=(partition,), at_least=True) + self.assert_message_count(topic, 21, partitions=(partition + 1,), + at_least=True) @kafka_versions("all") def test_switch_leader_keyed_producer(self): diff --git a/test/testutil.py b/test/testutil.py index 99d8d01..3a1d2ba 100644 --- a/test/testutil.py +++ b/test/testutil.py @@ -113,3 +113,8 @@ class Timer(object): self.interval = self.end - self.start logging.basicConfig(level=logging.DEBUG) +logging.getLogger('test.fixtures').setLevel(logging.ERROR) +logging.getLogger('test.service').setLevel(logging.ERROR) + +# kafka.conn debug logging is verbose, disable in tests by default +logging.getLogger('kafka.conn').setLevel(logging.INFO) |