diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:03:52 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:04:10 -0700 |
commit | 0dabb1fbe8a9f538527a03c2903475ed77a12c10 (patch) | |
tree | b5d7041e1f68097cf6fce5c16d15a64f4c62671d | |
parent | d15a52cab28aa32274a27e4af86acf3f34c2092a (diff) | |
download | kafka-python-0dabb1fbe8a9f538527a03c2903475ed77a12c10.tar.gz |
Add client unit tests for has_metadata_for_topic and ensure_topic_exists
-rw-r--r-- | test/test_client.py | 64 |
1 files changed, 63 insertions, 1 deletions
diff --git a/test/test_client.py b/test/test_client.py index bc11857..fea17a6 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -8,7 +8,7 @@ from kafka.common import ( BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, LeaderNotAvailableError, NoError, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, KafkaTimeoutError ) from kafka.protocol import create_message @@ -152,6 +152,68 @@ class TestKafkaClient(unittest2.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') + def test_has_metadata_for_topic(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + # Topics with no partitions return False + self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) + self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) + + # Topic with partition metadata, but no leaders return True + self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_ensure_topic_exists(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + with self.assertRaises(UnknownTopicOrPartitionError): + client.ensure_topic_exists('topic_doesnt_exist', timeout=1) + + with self.assertRaises(KafkaTimeoutError): + client.ensure_topic_exists('topic_still_creating', timeout=1) + + # This should not raise + client.ensure_topic_exists('topic_noleaders', timeout=1) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): "Get leader for partitions reload metadata if it is not available" |