diff options
author | mrtheb <mrlabbe@gmail.com> | 2014-02-09 13:44:47 -0500 |
---|---|---|
committer | mrtheb <mrlabbe@gmail.com> | 2014-02-09 13:44:47 -0500 |
commit | a2191e5be5d5fcd212582580c163f4533cca6c73 (patch) | |
tree | 6d13a0928f572d99a47299b536cdf209dbd29a5f | |
parent | 84de472a4d5b583ff3ed6cc6d92250a7c9291ceb (diff) | |
download | kafka-python-a2191e5be5d5fcd212582580c163f4533cca6c73.tar.gz |
Support list (or comma-separated) of hosts (replaces host and port arguments)
-rw-r--r-- | kafka/client.py | 13 | ||||
-rw-r--r-- | kafka/conn.py | 5 | ||||
-rw-r--r-- | test/test_integration.py | 26 | ||||
-rw-r--r-- | test/test_unit.py | 40 |
4 files changed, 56 insertions, 28 deletions
diff --git a/kafka/client.py b/kafka/client.py index 33c4419..96cc1df 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -10,7 +10,7 @@ from kafka.common import (ErrorMapping, TopicAndPartition, BrokerResponseError, PartitionUnavailableError, KafkaUnavailableError, KafkaRequestError) -from kafka.conn import KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS +from kafka.conn import collect_hosts, KafkaConnection, DEFAULT_SOCKET_TIMEOUT_SECONDS from kafka.protocol import KafkaProtocol log = logging.getLogger("kafka") @@ -24,14 +24,15 @@ class KafkaClient(object): # NOTE: The timeout given to the client should always be greater than the # one passed to SimpleConsumer.get_message(), otherwise you can get a # socket timeout. - def __init__(self, host, port, client_id=CLIENT_ID, + def __init__(self, hosts, client_id=CLIENT_ID, timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS): # We need one connection to bootstrap self.client_id = client_id self.timeout = timeout - self.conns = { # (host, port) -> KafkaConnection - (host, port): KafkaConnection(host, port, timeout=timeout) - } + self.hosts = collect_hosts(hosts) + + # create connections only when we need them + self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata self.topics_to_brokers = {} # topic_id -> broker_id self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] @@ -46,7 +47,7 @@ class KafkaClient(object): host_key = (host, port) if host_key not in self.conns: - self.conns[host_key] = KafkaConnection(host, port, self.bufsize) + self.conns[host_key] = KafkaConnection(host, port) return self.conns[host_key] diff --git a/kafka/conn.py b/kafka/conn.py index de2d385..20f22dc 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -17,8 +17,11 @@ def collect_hosts(hosts, randomize=True): randomize the returned list. """ + if isinstance(hosts, str): + hosts = hosts.split(',') + result = [] - for host_port in hosts.split(","): + for host_port in hosts: res = host_port.split(':') host = res[0] diff --git a/test/test_integration.py b/test/test_integration.py index 000f44a..3d6ccf6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name): class KafkaTestCase(unittest.TestCase): def setUp(self): - self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) ensure_topic_creation(self.client, self.topic) @@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port), bufsize=8192) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -800,7 +800,6 @@ class TestConsumer(KafkaTestCase): self.assertEquals(all_messages[i], message.message) self.assertEquals(i, 19) - # Produce 1 message that is too large (bigger than max fetch size) big_message_size = MAX_FETCH_BUFFER_SIZE_BYTES + 10 big_message = create_message(random_string(big_message_size)) @@ -827,25 +826,26 @@ class TestConsumer(KafkaTestCase): class TestFailover(KafkaTestCase): - def setUp(self): + @classmethod + def setUpClass(cls): # noqa zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] - self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - hosts = ','.join(['%s:%d' % (b.host, b.port) for b in cls.brokers]) + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] cls.client = KafkaClient(hosts) - super(TestFailover, self).setUp() - def tearDown(self): - self.client.close() - for broker in self.brokers: + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: broker.close() - self.zk.close() + cls.zk.close() def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 diff --git a/test/test_unit.py b/test/test_unit.py index 4c78c1b..624fe39 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -5,11 +5,13 @@ import unittest from mock import patch +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata ) +from kafka.common import KafkaUnavailableError from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -384,6 +386,26 @@ class TestProtocol(unittest.TestCase): class TestKafkaClient(unittest.TestCase): + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + def test_send_broker_unaware_request_fail(self): 'Tests that call fails when all hosts are unavailable' @@ -402,14 +424,16 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): - client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + client = KafkaClient(hosts=['kafka01:9092','kafka02:9092']) - resp = client._send_broker_unaware_request(1, 'fake request') - self.assertIsNone(resp) + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') for key, conn in mocked_conns.iteritems(): conn.send.assert_called_with(1, 'fake request') @@ -434,7 +458,7 @@ class TestKafkaClient(unittest.TestCase): return mocked_conns[(host, port)] # patch to avoid making requests before we want it - with patch.object(KafkaClient, '_load_metadata_for_topics'), \ + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): client = KafkaClient(hosts='kafka01:9092,kafka02:9092') @@ -444,7 +468,7 @@ class TestKafkaClient(unittest.TestCase): self.assertEqual('valid response', resp) mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata(self, protocol, conn): @@ -474,7 +498,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_unassigned_partitions(self, protocol, conn): @@ -513,7 +537,7 @@ class TestKafkaClient(unittest.TestCase): }, client.topics_to_brokers) - @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @unittest.skip('requires disabling recursion on load_metadata_for_topics') @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_client_load_metadata_noleader_partitions(self, protocol, conn): |