summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--test/test_integration.py30
1 files changed, 17 insertions, 13 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index 56974a5..9ad58db 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -12,20 +12,24 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from .fixtures import ZookeeperFixture, KafkaFixture
+def ensure_topic_creation(client, topic_name):
+ times = 0
+ while True:
+ times += 1
+ client.load_metadata_for_topics(topic_name)
+ if client.has_metadata_for_topic(topic_name):
+ break
+ print "Waiting for %s topic to be created" % topic_name
+ time.sleep(1)
+
+ if times > 30:
+ raise Exception("Unable to create topic %s" % topic_name)
+
+
class KafkaTestCase(unittest.TestCase):
def setUp(self):
topic_name = self.id()[self.id().rindex(".")+1:]
- times = 0
- while True:
- times += 1
- self.client.load_metadata_for_topics(topic_name)
- if self.client.has_metadata_for_topic(topic_name):
- break
- print "Waiting for %s topic to be created" % topic_name
- time.sleep(1)
-
- if times > 30:
- raise Exception("Unable to create topic %s" % topic_name)
+ ensure_topic_creation(self.client, topic_name)
class TestKafkaClient(KafkaTestCase):
@@ -719,7 +723,7 @@ class TestConsumer(KafkaTestCase):
start = datetime.now()
messages = consumer.get_messages(block=True, timeout=5)
diff = (datetime.now() - start).total_seconds()
- self.assertGreaterEqual(diff, 4.9)
+ self.assertGreaterEqual(diff, 4.999)
self.assertEqual(len(messages), 0)
# Send 10 messages
@@ -830,7 +834,7 @@ class TestFailover(KafkaTestCase):
kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions]
self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
- KafkaTestCase.setUp(self)
+ super(TestFailover, self).setUp()
def tearDown(self):
self.client.close()