diff options
-rw-r--r-- | kafka/client.py | 7 | ||||
-rw-r--r-- | setup.py | 3 | ||||
-rw-r--r-- | test/test_unit.py | 135 |
3 files changed, 134 insertions, 11 deletions
diff --git a/kafka/client.py b/kafka/client.py index 155f658..e6b3ca9 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -243,9 +243,10 @@ class KafkaClient(object): self.topic_partitions[topic] = [] for partition, meta in partitions.items(): - topic_part = TopicAndPartition(topic, partition) - self.topics_to_brokers[topic_part] = brokers[meta.leader] - self.topic_partitions[topic].append(partition) + if meta.leader != -1: + topic_part = TopicAndPartition(topic, partition) + self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topic_partitions[topic].append(partition) def send_produce_request(self, payloads=[], acks=1, timeout=1000, fail_on_error=True, callback=None): @@ -1,4 +1,3 @@ -import os.path import sys from setuptools import setup, Command @@ -23,7 +22,7 @@ setup( version="0.9.0", install_requires=["distribute"], - tests_require=["tox"], + tests_require=["tox", "mock"], cmdclass={"test": Tox}, packages=["kafka"], diff --git a/test/test_unit.py b/test/test_unit.py index e3fd4bb..0d8f169 100644 --- a/test/test_unit.py +++ b/test/test_unit.py @@ -6,7 +6,8 @@ import unittest from kafka.common import ( ProduceRequest, FetchRequest, Message, ChecksumError, ConsumerFetchSizeTooSmall, ProduceResponse, FetchResponse, - OffsetAndMessage, BrokerMetadata, PartitionMetadata + OffsetAndMessage, BrokerMetadata, PartitionMetadata, + TopicAndPartition ) from kafka.codec import ( has_gzip, has_snappy, gzip_encode, gzip_decode, @@ -16,6 +17,10 @@ from kafka.protocol import ( create_gzip_message, create_message, create_snappy_message, KafkaProtocol ) +from kafka.client import KafkaClient + +from mock import patch + ITERATIONS = 1000 STRLEN = 100 @@ -87,8 +92,9 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_gzip_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & - KafkaProtocol.CODEC_GZIP) + self.assertEqual( + msg.attributes, + KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_GZIP) self.assertEqual(msg.key, None) # Need to decode to check since gzipped payload is non-deterministic decoded = gzip_decode(msg.value) @@ -103,8 +109,9 @@ class TestProtocol(unittest.TestCase): payloads = ["v1", "v2"] msg = create_snappy_message(payloads) self.assertEqual(msg.magic, 0) - self.assertEqual(msg.attributes, KafkaProtocol.ATTRIBUTE_CODEC_MASK & - KafkaProtocol.CODEC_SNAPPY) + self.assertEqual( + msg.attributes, + KafkaProtocol.ATTRIBUTE_CODEC_MASK & KafkaProtocol.CODEC_SNAPPY) self.assertEqual(msg.key, None) expect = ("8\x00\x00\x19\x01@\x10L\x9f[\xc2\x00\x00\xff\xff\xff\xff" "\x00\x00\x00\x02v1\x19\x1bD\x00\x10\xd5\x96\nx\x00\x00\xff" @@ -275,6 +282,7 @@ class TestProtocol(unittest.TestCase): len(ms3), ms3) responses = list(KafkaProtocol.decode_fetch_response(encoded)) + def expand_messages(response): return FetchResponse(response.topic, response.partition, response.error, response.highwaterMark, @@ -362,7 +370,6 @@ class TestProtocol(unittest.TestCase): def test_decode_offset_response(self): pass - @unittest.skip("Not Implemented") def test_encode_offset_commit_request(self): pass @@ -380,5 +387,121 @@ class TestProtocol(unittest.TestCase): pass +class TestClient(unittest.TestCase): + + #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(1, 'broker_1', 4567) + brokers[1] = BrokerMetadata(2, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2]) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], [0, 1]), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + self.assertItemsEqual({ + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1]}, + client.topics_to_brokers) + + # @unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_unassigned_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + + self.assertItemsEqual({}, client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual({ + TopicAndPartition('topic_1', 0): brokers[0]}, + client.topics_to_brokers) + + #@unittest.skip('requires disabling recursion on _load_metadata_for_topics') + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_client_load_metadata_noleader_partitions(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = {} + brokers[0] = BrokerMetadata(0, 'broker_1', 4567) + brokers[1] = BrokerMetadata(1, 'broker_2', 5678) + + topics = {} + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, -1, [], []) + } + topics['topic_2'] = { + 0: PartitionMetadata('topic_2', 0, 0, [0, 1], []), + 1: PartitionMetadata('topic_2', 1, 1, [1, 0], [1, 0]) + } + protocol.decode_metadata_response.return_value = (brokers, topics) + + client = KafkaClient(host='broker_1', port=4567) + self.assertItemsEqual( + { + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + self.assertRaises( + Exception, + client._get_leader_for_partition, + 'topic_1', 0) + + # calling _get_leader_for_partition (from any broker aware request) + # will try loading metadata again for the same topic + topics['topic_1'] = { + 0: PartitionMetadata('topic_1', 0, 0, [0, 1], [0, 1]) + } + leader = client._get_leader_for_partition('topic_1', 0) + + self.assertEqual(brokers[0], leader) + self.assertItemsEqual( + { + TopicAndPartition('topic_1', 0): brokers[0], + TopicAndPartition('topic_2', 0): brokers[0], + TopicAndPartition('topic_2', 1): brokers[1] + }, + client.topics_to_brokers) + if __name__ == '__main__': unittest.main() |