summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 18:03:52 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit0dabb1fbe8a9f538527a03c2903475ed77a12c10 (patch)
treeb5d7041e1f68097cf6fce5c16d15a64f4c62671d
parentd15a52cab28aa32274a27e4af86acf3f34c2092a (diff)
downloadkafka-python-0dabb1fbe8a9f538527a03c2903475ed77a12c10.tar.gz
Add client unit tests for has_metadata_for_topic and ensure_topic_exists
-rw-r--r--test/test_client.py64
1 files changed, 63 insertions, 1 deletions
diff --git a/test/test_client.py b/test/test_client.py
index bc11857..fea17a6 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -8,7 +8,7 @@ from kafka.common import (
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
LeaderNotAvailableError, NoError,
- UnknownTopicOrPartitionError
+ UnknownTopicOrPartitionError, KafkaTimeoutError
)
from kafka.protocol import create_message
@@ -152,6 +152,68 @@ class TestKafkaClient(unittest2.TestCase):
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
+ def test_has_metadata_for_topic(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ # Topics with no partitions return False
+ self.assertFalse(client.has_metadata_for_topic('topic_still_creating'))
+ self.assertFalse(client.has_metadata_for_topic('topic_doesnt_exist'))
+
+ # Topic with partition metadata, but no leaders return True
+ self.assertTrue(client.has_metadata_for_topic('topic_noleaders'))
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
+ def test_ensure_topic_exists(self, protocol, conn):
+
+ conn.recv.return_value = 'response' # anything but None
+
+ brokers = [
+ BrokerMetadata(0, 'broker_1', 4567),
+ BrokerMetadata(1, 'broker_2', 5678)
+ ]
+
+ topics = [
+ TopicMetadata('topic_still_creating', NO_LEADER, []),
+ TopicMetadata('topic_doesnt_exist', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_noleaders', NO_ERROR, [
+ PartitionMetadata('topic_noleaders', 0, -1, [], [], NO_LEADER),
+ PartitionMetadata('topic_noleaders', 1, -1, [], [], NO_LEADER),
+ ]),
+ ]
+ protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
+
+ client = KafkaClient(hosts=['broker_1:4567'])
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.ensure_topic_exists('topic_doesnt_exist', timeout=1)
+
+ with self.assertRaises(KafkaTimeoutError):
+ client.ensure_topic_exists('topic_still_creating', timeout=1)
+
+ # This should not raise
+ client.ensure_topic_exists('topic_noleaders', timeout=1)
+
+ @patch('kafka.client.KafkaConnection')
+ @patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
"Get leader for partitions reload metadata if it is not available"