summaryrefslogtreecommitdiff
path: root/test/test_integration.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_integration.py')
-rw-r--r--test/test_integration.py23
1 files changed, 14 insertions, 9 deletions
diff --git a/test/test_integration.py b/test/test_integration.py
index a10dae2..1f37ebf 100644
--- a/test/test_integration.py
+++ b/test/test_integration.py
@@ -16,7 +16,7 @@ class TestKafkaClient(unittest.TestCase):
def setUpClass(cls): # noqa
cls.zk = ZookeeperFixture.instance()
cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server.host, cls.server.port)
+ cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port))
@classmethod
def tearDownClass(cls): # noqa
@@ -554,7 +554,7 @@ class TestConsumer(unittest.TestCase):
cls.zk = ZookeeperFixture.instance()
cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port)
cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port)
- cls.client = KafkaClient(cls.server2.host, cls.server2.port, bufsize=8192)
+ cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192)
@classmethod
def tearDownClass(cls): # noqa
@@ -770,20 +770,23 @@ class TestConsumer(unittest.TestCase):
self.assertEquals(all_messages[i], message.message)
self.assertEquals(i, 19)
+
class TestFailover(unittest.TestCase):
@classmethod
def setUpClass(cls):
zk_chroot = random_string(10)
- replicas = 2
+ replicas = 2
partitions = 2
# mini zookeeper, 2 kafka brokers
- cls.zk = ZookeeperFixture.instance()
- kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
+ cls.zk = ZookeeperFixture.instance()
+ kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions]
cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)]
- cls.client = KafkaClient(cls.brokers[0].host, cls.brokers[0].port)
+
+ hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers])
+ cls.client = KafkaClient(hosts)
@classmethod
def tearDownClass(cls):
@@ -858,17 +861,19 @@ class TestFailover(unittest.TestCase):
resp = producer.send_messages(random_string(10))
if len(resp) > 0:
self.assertEquals(resp[0].error, 0)
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
def _kill_leader(self, topic, partition):
leader = self.client.topics_to_brokers[TopicAndPartition(topic, partition)]
broker = self.brokers[leader.nodeId]
broker.close()
- time.sleep(1) # give it some time
+ time.sleep(1) # give it some time
return broker
def _count_messages(self, group, topic):
- client = KafkaClient(self.brokers[0].host, self.brokers[0].port)
+
+ hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port)
+ client = KafkaClient(hosts)
consumer = SimpleConsumer(client, group, topic, auto_commit=False)
all_messages = []
for message in consumer: