summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/service.py3
-rw-r--r--test/test_conn.py5
-rw-r--r--test/test_consumer_integration.py28
-rw-r--r--test/test_failover_integration.py10
-rw-r--r--test/testutil.py5
-rw-r--r--tox.ini2
6 files changed, 36 insertions, 17 deletions
diff --git a/test/service.py b/test/service.py
index 9368b85..b986a71 100644
--- a/test/service.py
+++ b/test/service.py
@@ -59,7 +59,8 @@ class SpawnedService(threading.Thread):
self.alive = True
def _despawn(self):
- self.child.terminate()
+ if self.child.poll() is None:
+ self.child.terminate()
self.alive = False
for _ in range(50):
if self.child.poll() is not None:
diff --git a/test/test_conn.py b/test/test_conn.py
index c4f219b..6e47cc8 100644
--- a/test/test_conn.py
+++ b/test/test_conn.py
@@ -1,3 +1,4 @@
+import logging
import socket
import struct
from threading import Thread
@@ -10,6 +11,10 @@ from kafka.conn import KafkaConnection, collect_hosts, DEFAULT_SOCKET_TIMEOUT_SE
class ConnTest(unittest.TestCase):
def setUp(self):
+
+ # kafka.conn debug logging is verbose, so only enable in conn tests
+ logging.getLogger('kafka.conn').setLevel(logging.DEBUG)
+
self.config = {
'host': 'localhost',
'port': 9090,
diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py
index c202c5c..8911e3e 100644
--- a/test/test_consumer_integration.py
+++ b/test/test_consumer_integration.py
@@ -170,11 +170,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_simple_consumer_blocking(self):
consumer = self.consumer()
- # Ask for 5 messages, nothing in queue, block 5 seconds
+ # Ask for 5 messages, nothing in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=5)
+ messages = consumer.get_messages(block=True, timeout=1)
self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
self.send_messages(0, range(0, 10))
@@ -184,11 +184,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
- # Ask for 10 messages, get 5 back, block 5 seconds
+ # Ask for 10 messages, get 5 back, block 1 second
with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=5)
+ messages = consumer.get_messages(count=10, block=True, timeout=1)
self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
consumer.stop()
@@ -236,12 +236,12 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
def test_multi_process_consumer_blocking(self):
consumer = self.consumer(consumer = MultiProcessConsumer)
- # Ask for 5 messages, No messages in queue, block 5 seconds
+ # Ask for 5 messages, No messages in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(block=True, timeout=5)
+ messages = consumer.get_messages(block=True, timeout=1)
self.assert_message_count(messages, 0)
- self.assertGreaterEqual(t.interval, 5)
+ self.assertGreaterEqual(t.interval, 1)
# Send 10 messages
self.send_messages(0, range(0, 10))
@@ -252,11 +252,11 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assert_message_count(messages, 5)
self.assertLessEqual(t.interval, 1)
- # Ask for 10 messages, 5 in queue, block 5 seconds
+ # Ask for 10 messages, 5 in queue, block 1 second
with Timer() as t:
- messages = consumer.get_messages(count=10, block=True, timeout=5)
+ messages = consumer.get_messages(count=10, block=True, timeout=1)
self.assert_message_count(messages, 5)
- self.assertGreaterEqual(t.interval, 4.95)
+ self.assertGreaterEqual(t.interval, 1)
consumer.stop()
@@ -450,7 +450,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
consumer = self.kafka_consumer(auto_offset_reset='smallest',
consumer_timeout_ms=TIMEOUT_MS)
- # Ask for 5 messages, nothing in queue, block 5 seconds
+ # Ask for 5 messages, nothing in queue, block 500ms
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
msg = consumer.next()
@@ -467,7 +467,7 @@ class TestConsumerIntegration(KafkaIntegrationTestCase):
self.assertEqual(len(messages), 5)
self.assertLess(t.interval, TIMEOUT_MS / 1000.0 )
- # Ask for 10 messages, get 5 back, block 5 seconds
+ # Ask for 10 messages, get 5 back, block 500ms
messages = set()
with Timer() as t:
with self.assertRaises(ConsumerTimeout):
diff --git a/test/test_failover_integration.py b/test/test_failover_integration.py
index 5082d7c..91e22cf 100644
--- a/test/test_failover_integration.py
+++ b/test/test_failover_integration.py
@@ -98,10 +98,14 @@ class TestFailover(KafkaIntegrationTestCase):
# Test the base class Producer -- send_messages to a specific partition
producer = Producer(self.client, async=True,
- req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT)
+ batch_send_every_n=15,
+ batch_send_every_t=3,
+ req_acks=Producer.ACK_AFTER_CLUSTER_COMMIT,
+ async_log_messages_on_error=False)
# Send 10 random messages
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# kill leader for partition
self._kill_leader(topic, partition)
@@ -110,9 +114,11 @@ class TestFailover(KafkaIntegrationTestCase):
# in async mode, this should return immediately
producer.send_messages(topic, partition, b'success')
+ producer.send_messages(topic, partition + 1, b'success')
# send to new leader
self._send_random_messages(producer, topic, partition, 10)
+ self._send_random_messages(producer, topic, partition + 1, 10)
# Stop the producer and wait for it to shutdown
producer.stop()
@@ -129,6 +135,8 @@ class TestFailover(KafkaIntegrationTestCase):
# Should be equal to 10 before + 1 recovery + 10 after
self.assert_message_count(topic, 21, partitions=(partition,),
at_least=True)
+ self.assert_message_count(topic, 21, partitions=(partition + 1,),
+ at_least=True)
@kafka_versions("all")
def test_switch_leader_keyed_producer(self):
diff --git a/test/testutil.py b/test/testutil.py
index 99d8d01..3a1d2ba 100644
--- a/test/testutil.py
+++ b/test/testutil.py
@@ -113,3 +113,8 @@ class Timer(object):
self.interval = self.end - self.start
logging.basicConfig(level=logging.DEBUG)
+logging.getLogger('test.fixtures').setLevel(logging.ERROR)
+logging.getLogger('test.service').setLevel(logging.ERROR)
+
+# kafka.conn debug logging is verbose, disable in tests by default
+logging.getLogger('kafka.conn').setLevel(logging.INFO)
diff --git a/tox.ini b/tox.ini
index e3e8568..fcb8908 100644
--- a/tox.ini
+++ b/tox.ini
@@ -11,7 +11,7 @@ deps =
mock
python-snappy
commands =
- nosetests {posargs:-v --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
+ nosetests {posargs:-v -x --with-id --id-file={envdir}/.noseids --with-timer --timer-top-n 10 --with-coverage --cover-erase --cover-package kafka}
setenv =
PROJECT_ROOT = {toxinidir}
passenv = KAFKA_VERSION