diff options
-rw-r--r-- | test/test_client.py | 64 |
1 files changed, 63 insertions, 1 deletions
diff --git a/test/test_client.py b/test/test_client.py index bc11857..fea17a6 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -8,7 +8,7 @@ from kafka.common import ( BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, LeaderNotAvailableError, NoError, - UnknownTopicOrPartitionError + UnknownTopicOrPartitionError, KafkaTimeoutError ) from kafka.protocol import create_message @@ -152,6 +152,68 @@ class TestKafkaClient(unittest2.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') + def test_has_metadata_for_topic(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + # Topics with no partitions return False + self.assertFalse(client.has_metadata_for_topic('topic_still_creating')) + self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist')) + + # Topic with partition metadata, but no leaders return True + self.assertTrue(client.has_metadata_for_topic('topic_noleaders')) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_ensure_topic_exists(self, protocol, conn): + + conn.recv.return_value = 'response' # anything but None + + brokers = [ + BrokerMetadata(0, 'broker_1', 4567), + BrokerMetadata(1, 'broker_2', 5678) + ] + + topics = [ + TopicMetadata('topic_still_creating', NO_LEADER, []), + TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_noleaders', NO_ERROR, [ + PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER), + PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER), + ]), + ] + protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) + + client = KafkaClient(hosts=['broker_1:4567']) + + with self.assertRaises(UnknownTopicOrPartitionError): + client.ensure_topic_exists('topic_doesnt_exist', timeout=1) + + with self.assertRaises(KafkaTimeoutError): + client.ensure_topic_exists('topic_still_creating', timeout=1) + + # This should not raise + client.ensure_topic_exists('topic_noleaders', timeout=1) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): "Get leader for partitions reload metadata if it is not available" |