diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 16:38:40 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:04:10 -0700 |
commit | 90c6520097b21d6f6bd075c97c93c0adbc5484c8 (patch) | |
tree | e3f4a5800cdf0c35ef2da677f3ca38c10fb6dd5b /test/test_client.py | |
parent | 11fc9bc2e61b34bddbf6d54228709e075b2615a1 (diff) | |
download | kafka-python-90c6520097b21d6f6bd075c97c93c0adbc5484c8.tar.gz |
load_metadata_for_topics should raise exceptions on explicit topic args
Diffstat (limited to 'test/test_client.py')
-rw-r--r-- | test/test_client.py | 71 |
1 files changed, 46 insertions, 25 deletions
diff --git a/test/test_client.py b/test/test_client.py index dbc9883..06eec75 100644 --- a/test/test_client.py +++ b/test/test_client.py @@ -7,10 +7,15 @@ from kafka.common import ( ProduceRequest, MetadataResponse, BrokerMetadata, TopicMetadata, PartitionMetadata, TopicAndPartition, KafkaUnavailableError, - LeaderNotAvailableError, PartitionUnavailableError + LeaderNotAvailableError, PartitionUnavailableError, NoError, + UnknownTopicOrPartitionError ) from kafka.protocol import create_message +NO_ERROR = 0 +UNKNOWN_TOPIC_OR_PARTITION = 3 +NO_LEADER = 5 + class TestKafkaClient(unittest2.TestCase): def test_init_with_list(self): with patch.object(KafkaClient, 'load_metadata_for_topics'): @@ -96,7 +101,6 @@ class TestKafkaClient(unittest2.TestCase): @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_load_metadata(self, protocol, conn): - "Load metadata for all topics" conn.recv.return_value = 'response' # anything but None @@ -106,18 +110,21 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_1', 0, [ - PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0) + TopicMetadata('topic_1', NO_ERROR, [ + PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR) ]), - TopicMetadata('topic_noleader', 0, [ - PartitionMetadata('topic_noleader', 0, -1, [], [], 0), - PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), ]), - TopicMetadata('topic_no_partitions', 0, []), - TopicMetadata('topic_3', 0, [ - PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0), - PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0), - PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0) + TopicMetadata('topic_no_partitions', NO_LEADER, []), + TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []), + TopicMetadata('topic_3', NO_ERROR, [ + PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR), + PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR) ]) ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -133,6 +140,16 @@ class TestKafkaClient(unittest2.TestCase): TopicAndPartition('topic_3', 2): brokers[0]}, client.topics_to_brokers) + # if we ask for metadata explicitly, it should raise errors + with self.assertRaises(LeaderNotAvailableError): + client.load_metadata_for_topics('topic_no_partitions') + + with self.assertRaises(UnknownTopicOrPartitionError): + client.load_metadata_for_topics('topic_unknown') + + # This should not raise + client.load_metadata_for_topics('topic_no_leader') + @patch('kafka.client.KafkaConnection') @patch('kafka.client.KafkaProtocol') def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn): @@ -146,7 +163,7 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_no_partitions', 0, []) + TopicMetadata('topic_no_partitions', NO_LEADER, []) ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -156,8 +173,8 @@ class TestKafkaClient(unittest2.TestCase): self.assertDictEqual({}, client.topics_to_brokers) topics = [ - TopicMetadata('topic_one_partition', 0, [ - PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0) + TopicMetadata('topic_one_partition', NO_ERROR, [ + PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR) ]) ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -184,7 +201,7 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_no_partitions', 0, []) + TopicMetadata('topic_no_partitions', NO_ERROR, []) ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -208,9 +225,11 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_noleader', 0, [ - PartitionMetadata('topic_noleader', 0, -1, [], [], 0), - PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), ]), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -226,9 +245,9 @@ class TestKafkaClient(unittest2.TestCase): self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1)) topics = [ - TopicMetadata('topic_noleader', 0, [ - PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0), - PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0) + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR), + PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR) ]), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) @@ -248,9 +267,11 @@ class TestKafkaClient(unittest2.TestCase): ] topics = [ - TopicMetadata('topic_noleader', 0, [ - PartitionMetadata('topic_noleader', 0, -1, [], [], 0), - PartitionMetadata('topic_noleader', 1, -1, [], [], 0) + TopicMetadata('topic_noleader', NO_ERROR, [ + PartitionMetadata('topic_noleader', 0, -1, [], [], + NO_LEADER), + PartitionMetadata('topic_noleader', 1, -1, [], [], + NO_LEADER), ]), ] protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics) |