diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 72 |
1 files changed, 45 insertions, 27 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index eaf432d..56974a5 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -12,7 +12,23 @@ from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES from .fixtures import ZookeeperFixture, KafkaFixture -class TestKafkaClient(unittest.TestCase): +class KafkaTestCase(unittest.TestCase): + def setUp(self): + topic_name = self.id()[self.id().rindex(".")+1:] + times = 0 + while True: + times += 1 + self.client.load_metadata_for_topics(topic_name) + if self.client.has_metadata_for_topic(topic_name): + break + print "Waiting for %s topic to be created" % topic_name + time.sleep(1) + + if times > 30: + raise Exception("Unable to create topic %s" % topic_name) + + +class TestKafkaClient(KafkaTestCase): @classmethod def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() @@ -30,6 +46,7 @@ class TestKafkaClient(unittest.TestCase): ##################### def test_produce_many_simple(self): + produce = ProduceRequest("test_produce_many_simple", 0, messages=[ create_message("Test message %d" % i) for i in range(100) ]) @@ -331,15 +348,15 @@ class TestKafkaClient(unittest.TestCase): producer.stop() def test_hashed_partitioner(self): - producer = KeyedProducer(self.client, "test_hash_partitioner", + producer = KeyedProducer(self.client, "test_hashed_partitioner", partitioner=HashedPartitioner) producer.send(1, "one") producer.send(2, "two") producer.send(3, "three") producer.send(4, "four") - fetch1 = FetchRequest("test_hash_partitioner", 0, 0, 1024) - fetch2 = FetchRequest("test_hash_partitioner", 1, 0, 1024) + fetch1 = FetchRequest("test_hashed_partitioner", 0, 0, 1024) + fetch2 = FetchRequest("test_hashed_partitioner", 1, 0, 1024) fetch_resp1, fetch_resp2 = self.client.send_fetch_request([fetch1, fetch2]) @@ -549,7 +566,7 @@ class TestKafkaClient(unittest.TestCase): producer.stop() -class TestConsumer(unittest.TestCase): +class TestConsumer(KafkaTestCase): @classmethod def setUpClass(cls): cls.zk = ZookeeperFixture.instance() @@ -648,21 +665,21 @@ class TestConsumer(unittest.TestCase): def test_simple_consumer_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_simple_pending", 0, messages=[ + produce1 = ProduceRequest("test_simple_consumer_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce1]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_simple_pending", 1, messages=[ + produce2 = ProduceRequest("test_simple_consumer_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) for resp in self.client.send_produce_request([produce2]): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = SimpleConsumer(self.client, "group1", "test_simple_pending", + consumer = SimpleConsumer(self.client, "group1", "test_simple_consumer_pending", auto_commit=False, iter_timeout=0) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) @@ -671,7 +688,7 @@ class TestConsumer(unittest.TestCase): def test_multi_process_consumer(self): # Produce 100 messages to partition 0 - produce1 = ProduceRequest("test_mpconsumer", 0, messages=[ + produce1 = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(100) ]) @@ -680,7 +697,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Produce 100 messages to partition 1 - produce2 = ProduceRequest("test_mpconsumer", 1, messages=[ + produce2 = ProduceRequest("test_multi_process_consumer", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(100) ]) @@ -689,7 +706,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.offset, 0) # Start a consumer - consumer = MultiProcessConsumer(self.client, "grp1", "test_mpconsumer", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "grp1", "test_multi_process_consumer", auto_commit=False) all_messages = [] for message in consumer: all_messages.append(message) @@ -702,11 +719,11 @@ class TestConsumer(unittest.TestCase): start = datetime.now() messages = consumer.get_messages(block=True, timeout=5) diff = (datetime.now() - start).total_seconds() - self.assertGreaterEqual(diff, 5) + self.assertGreaterEqual(diff, 4.9) self.assertEqual(len(messages), 0) # Send 10 messages - produce = ProduceRequest("test_mpconsumer", 0, messages=[ + produce = ProduceRequest("test_multi_process_consumer", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -729,7 +746,7 @@ class TestConsumer(unittest.TestCase): def test_multi_proc_pending(self): # Produce 10 messages to partition 0 and 1 - produce1 = ProduceRequest("test_mppending", 0, messages=[ + produce1 = ProduceRequest("test_multi_proc_pending", 0, messages=[ create_message("Test message 0 %d" % i) for i in range(10) ]) @@ -737,7 +754,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - produce2 = ProduceRequest("test_mppending", 1, messages=[ + produce2 = ProduceRequest("test_multi_proc_pending", 1, messages=[ create_message("Test message 1 %d" % i) for i in range(10) ]) @@ -745,7 +762,7 @@ class TestConsumer(unittest.TestCase): self.assertEquals(resp.error, 0) self.assertEquals(resp.offset, 0) - consumer = MultiProcessConsumer(self.client, "group1", "test_mppending", auto_commit=False) + consumer = MultiProcessConsumer(self.client, "group1", "test_multi_proc_pending", auto_commit=False) self.assertEquals(consumer.pending(), 20) self.assertEquals(consumer.pending(partitions=[0]), 10) self.assertEquals(consumer.pending(partitions=[1]), 10) @@ -800,19 +817,20 @@ class TestConsumer(unittest.TestCase): self.assertIsNotNone(message) self.assertEquals(message.message.value, big_message.value) -class TestFailover(unittest.TestCase): - def setUp(self): +class TestFailover(KafkaTestCase): - zk_chroot = random_string(10) - replicas = 2 + def setUp(self): + zk_chroot = random_string(10) + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] + self.zk = ZookeeperFixture.instance() + kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + KafkaTestCase.setUp(self) def tearDown(self): self.client.close() @@ -835,7 +853,7 @@ class TestFailover(unittest.TestCase): broker = self._kill_leader(topic, partition) # expect failure, reload meta data - with self.assertRaises(FailedPayloadsException): + with self.assertRaises(FailedPayloadsError): producer.send_messages('part 1') producer.send_messages('part 2') time.sleep(1) @@ -886,17 +904,17 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(self.brokers[0].host, self.brokers[0].port) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: |