From a2191e5be5d5fcd212582580c163f4533cca6c73 Mon Sep 17 00:00:00 2001 From: mrtheb Date: Sun, 9 Feb 2014 13:44:47 -0500 Subject: Support list (or comma-separated) of hosts (replaces host and port arguments) --- test/test_integration.py | 26 +++++++++++++------------- test/test_unit.py | 40 ++++++++++++++++++++++++++++++++-------- 2 files changed, 45 insertions(+), 21 deletions(-) (limited to 'test') diff --git a/test/test_integration.py b/test/test_integration.py index 000f44a..3d6ccf6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name): class KafkaTestCase(unittest.TestCase): def setUp(self): - self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) ensure_topic_creation(self.client, self.topic) @@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -800,7 +800,6 @@ class TestConsumer(KafkaTestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) - # Produce 1 message that is too large (bigger than max fetch size) big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 big_message = create_message(random_string(big_message_size)) @@ -827,25 +826,26 @@ class TestConsumer(KafkaTestCase): class TestFailover(KafkaTestCase): - def setUp(self): + @classmethod + def setUpClass(cls): # noqa zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] - self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] cls.client = KafkaClient(hosts) - super(TestFailover, self).setUp() - def tearDown(self): - self.client.close() - for broker in self.brokers: + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: broker.close() - self.zk.close() + cls.zk.close() def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 diff --git a/test/test_unit.py b/test/test_unit.py index 4c78c1b..624fe39 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -5,11 +5,13 @@ import unittest from mock import patch +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata ) +from kafka.common import KafkaUnavailableError from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -384,6 +386,26 @@ class TestProtocol(unittest.TestCase): class TestKafkaClient(unittest.TestCase): + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' @@ -402,14 +424,16 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + client = KafkaClient(hosts=['kafka01:9092','kafka02:9092']) - resp = client._send_broker_unaware_request(1, 'fake request') - self.assertIsNone(resp) + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') for key, conn in mocked_conns.iteritems(): conn.send.assert_called_with(1, 'fake request') @@ -434,7 +458,7 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): client = KafkaClient(hosts='kafka01:9092,kafka02:9092') @@ -444,7 +468,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata(self, protocol, conn): @@ -474,7 +498,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_unassigned_partitions(self, protocol, conn): @@ -513,7 +537,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_noleader_partitions(self, protocol, conn): -- cgit v1.2.1