diff options
Diffstat (limited to 'test/test_integration.py')
-rw-r--r-- | test/test_integration.py | 23 |
1 files changed, 14 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index a10dae2..1f37ebf 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) @classmethod def tearDownClass(cls): # noqa @@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) + class TestFailover(unittest.TestCase): @classmethod def setUpClass(cls): zk_chroot = random_string(10) - replicas = 2 + replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - cls.zk = ZookeeperFixture.instance() - kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port) + + hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + cls.client = KafkaClient(hosts) @classmethod def tearDownClass(cls): @@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase): resp = producer.send_messages(random_string(10)) if len(resp) > 0: self.assertEquals(resp[0].error, 0) - time.sleep(1) # give it some time + time.sleep(1) # give it some time def _kill_leader(self, topic, partition): leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)] broker = self.brokers[leader.nodeId] broker.close() - time.sleep(1) # give it some time + time.sleep(1) # give it some time return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False) all_messages = [] for message in consumer: |