summaryrefslogtreecommitdiff
path: root/test/test_client.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/test_client.py')
-rw-r--r--test/test_client.py71
1 files changed, 46 insertions, 25 deletions
diff --git a/test/test_client.py b/test/test_client.py
index dbc9883..06eec75 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -7,10 +7,15 @@ from kafka.common import (
ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
- LeaderNotAvailableError, PartitionUnavailableError
+ LeaderNotAvailableError, PartitionUnavailableError, NoError,
+ UnknownTopicOrPartitionError
)
from kafka.protocol import create_message
+NO_ERROR = 0
+UNKNOWN_TOPIC_OR_PARTITION = 3
+NO_LEADER = 5
+
class TestKafkaClient(unittest2.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -96,7 +101,6 @@ class TestKafkaClient(unittest2.TestCase):
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
- "Load metadata for all topics"
conn.recv.return_value = 'response' # anything but None
@@ -106,18 +110,21 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_1', 0, [
- PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0)
+ TopicMetadata('topic_1', NO_ERROR, [
+ PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
]),
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
- TopicMetadata('topic_no_partitions', 0, []),
- TopicMetadata('topic_3', 0, [
- PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0),
- PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0),
- PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0)
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_3', NO_ERROR, [
+ PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
+ PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -133,6 +140,16 @@ class TestKafkaClient(unittest2.TestCase):
TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers)
+ # if we ask for metadata explicitly, it should raise errors
+ with self.assertRaises(LeaderNotAvailableError):
+ client.load_metadata_for_topics('topic_no_partitions')
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.load_metadata_for_topics('topic_unknown')
+
+ # This should not raise
+ client.load_metadata_for_topics('topic_no_leader')
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
@@ -146,7 +163,7 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_no_partitions', 0, [])
+ TopicMetadata('topic_no_partitions', NO_LEADER, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -156,8 +173,8 @@ class TestKafkaClient(unittest2.TestCase):
self.assertDictEqual({}, client.topics_to_brokers)
topics = [
- TopicMetadata('topic_one_partition', 0, [
- PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0)
+ TopicMetadata('topic_one_partition', NO_ERROR, [
+ PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -184,7 +201,7 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_no_partitions', 0, [])
+ TopicMetadata('topic_no_partitions', NO_ERROR, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -208,9 +225,11 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -226,9 +245,9 @@ class TestKafkaClient(unittest2.TestCase):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0),
- PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -248,9 +267,11 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)