diff options
author | Omar <omar.ghishan@rd.io> | 2014-02-26 16:55:28 -0800 |
---|---|---|
committer | Omar <omar.ghishan@rd.io> | 2014-02-26 16:55:28 -0800 |
commit | ab89a44ecf9c93b116fcc8516cfc21749df74507 (patch) | |
tree | 5e318c4c7541bade1da687d2cf5fd145c594dc2b /test | |
parent | e5fdc1c7b22c8ad2aaa66a486871d0ed65977e3d (diff) | |
parent | 51910f981843dfa967d24659cdb46117210c832d (diff) | |
download | kafka-python-ab89a44ecf9c93b116fcc8516cfc21749df74507.tar.gz |
Merge pull request #122 from mrtheb/multihosts
Support for multiple hosts on KafkaClient boostrap (improves on #70)
Diffstat (limited to 'test')
-rw-r--r-- | test/test_integration.py | 34 | ||||
-rw-r--r-- | test/test_unit.py | 84 |
2 files changed, 102 insertions, 16 deletions
diff --git a/test/test_integration.py b/test/test_integration.py index d0da523..3d6ccf6 100644 --- a/test/test_integration.py +++ b/test/test_integration.py @@ -33,7 +33,7 @@ def ensure_topic_creation(client, topic_name): class KafkaTestCase(unittest.TestCase): def setUp(self): - self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10)) + self.topic = "%s-%s" % (self.id()[self.id().rindex(".") + 1:], random_string(10)) ensure_topic_creation(self.client, self.topic) @@ -42,7 +42,7 @@ class TestKafkaClient(KafkaTestCase): def setUpClass(cls): # noqa cls.zk = ZookeeperFixture.instance() cls.server = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server.host, cls.server.port) + cls.client = KafkaClient('%s:%d' % (cls.server.host, cls.server.port)) @classmethod def tearDownClass(cls): # noqa @@ -578,7 +578,7 @@ class TestConsumer(KafkaTestCase): cls.zk = ZookeeperFixture.instance() cls.server1 = KafkaFixture.instance(0, cls.zk.host, cls.zk.port) cls.server2 = KafkaFixture.instance(1, cls.zk.host, cls.zk.port) - cls.client = KafkaClient(cls.server2.host, cls.server2.port) + cls.client = KafkaClient('%s:%d' % (cls.server2.host, cls.server2.port)) @classmethod def tearDownClass(cls): # noqa @@ -826,23 +826,26 @@ class TestConsumer(KafkaTestCase): class TestFailover(KafkaTestCase): - def setUp(self): + @classmethod + def setUpClass(cls): # noqa zk_chroot = random_string(10) replicas = 2 partitions = 2 # mini zookeeper, 2 kafka brokers - self.zk = ZookeeperFixture.instance() - kk_args = [self.zk.host, self.zk.port, zk_chroot, replicas, partitions] - self.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] - self.client = KafkaClient(self.brokers[0].host, self.brokers[0].port) - super(TestFailover, self).setUp() - - def tearDown(self): - self.client.close() - for broker in self.brokers: + cls.zk = ZookeeperFixture.instance() + kk_args = [cls.zk.host, cls.zk.port, zk_chroot, replicas, partitions] + cls.brokers = [KafkaFixture.instance(i, *kk_args) for i in range(replicas)] + + hosts = ['%s:%d' % (b.host, b.port) for b in cls.brokers] + cls.client = KafkaClient(hosts) + + @classmethod + def tearDownClass(cls): + cls.client.close() + for broker in cls.brokers: broker.close() - self.zk.close() + cls.zk.close() def test_switch_leader(self): key, topic, partition = random_string(5), self.topic, 0 @@ -918,7 +921,8 @@ class TestFailover(KafkaTestCase): return broker def _count_messages(self, group, topic): - client = KafkaClient(self.brokers[0].host, self.brokers[0].port) + hosts = '%s:%d' % (self.brokers[0].host, self.brokers[0].port) + client = KafkaClient(hosts) consumer = SimpleConsumer(client, group, topic, auto_commit=False, iter_timeout=0) all_messages = [] for message in consumer: diff --git a/test/test_unit.py b/test/test_unit.py index b5f0118..aec0a2c 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -3,11 +3,16 @@ import random import struct import unittest +from mock import MagicMock, patch + + +from kafka import KafkaClient from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, OffsetAndMessage, BrokerMetadata, PartitionMetadata ) +from kafka.common import KafkaUnavailableError from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, snappy_encode, snappy_decode @@ -405,7 +410,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -423,5 +427,83 @@ class TestProtocol(unittest.TestCase): pass +class TestKafkaClient(unittest.TestCase): + + def test_init_with_list(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts=['kafka01:9092', 'kafka02:9092', 'kafka03:9092']) + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_init_with_csv(self): + + with patch.object(KafkaClient, 'load_metadata_for_topics'): + client = KafkaClient( + hosts='kafka01:9092,kafka02:9092,kafka03:9092') + + self.assertItemsEqual( + [('kafka01', 9092), ('kafka02', 9092), ('kafka03', 9092)], + client.hosts) + + def test_send_broker_unaware_request_fail(self): + 'Tests that call fails when all hosts are unavailable' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].send.side_effect = RuntimeError("Kafka02 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts=['kafka01:9092', 'kafka02:9092']) + + self.assertRaises( + KafkaUnavailableError, + client._send_broker_unaware_request, + 1, 'fake request') + + for key, conn in mocked_conns.iteritems(): + conn.send.assert_called_with(1, 'fake request') + + def test_send_broker_unaware_request(self): + 'Tests that call works when at least one of the host is available' + + mocked_conns = { + ('kafka01', 9092): MagicMock(), + ('kafka02', 9092): MagicMock(), + ('kafka03', 9092): MagicMock() + } + # inject KafkaConnection side effects + mocked_conns[('kafka01', 9092)].send.side_effect = RuntimeError("kafka01 went away (unittest)") + mocked_conns[('kafka02', 9092)].recv.return_value = 'valid response' + mocked_conns[('kafka03', 9092)].send.side_effect = RuntimeError("kafka03 went away (unittest)") + + def mock_get_conn(host, port): + return mocked_conns[(host, port)] + + # patch to avoid making requests before we want it + with patch.object(KafkaClient, 'load_metadata_for_topics'), \ + patch.object(KafkaClient, '_get_conn', side_effect=mock_get_conn): + + client = KafkaClient(hosts='kafka01:9092,kafka02:9092') + + resp = client._send_broker_unaware_request(1, 'fake request') + + self.assertEqual('valid response', resp) + mocked_conns[('kafka02', 9092)].recv.assert_called_with(1) + + if __name__ == '__main__': unittest.main() |