summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client.py52
-rw-r--r--test/test_client.py71
2 files changed, 86 insertions, 37 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 46cd7ce..3b9aba9 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -261,16 +261,30 @@ class KafkaClient(object):
def load_metadata_for_topics(self, *topics):
"""
- Discover brokers and metadata for a set of topics. This function is called
- lazily whenever metadata is unavailable.
+ Fetch broker and topic-partition metadata from the server,
+ and update internal data:
+ broker list, topic/partition list, and topic/parition -> broker map
- If broker does not auto-create topics, expect
- UnknownTopicOrPartitionError for new topics
+ This method should be called after receiving any error
- If broker auto-creates topics, expect
- LeaderNotAvailableError for new topics
+ @param: *topics (optional)
+ If a list of topics is provided, the metadata refresh will be limited
+ to the specified topics only.
+
+ Exceptions:
+ ----------
+ If the broker is configured to not auto-create topics,
+ expect UnknownTopicOrPartitionError for topics that don't exist
+
+ If the broker is configured to auto-create topics,
+ expect LeaderNotAvailableError for new topics
until partitions have been initialized.
- Retry.
+
+ Exceptions *will not* be raised in a full refresh (i.e. no topic list)
+ In this case, error codes will be logged as errors
+
+ Partition-level errors will also not be raised here
+ (a single partition w/o a leader, for example)
"""
resp = self.send_metadata_request(topics)
@@ -287,8 +301,17 @@ class KafkaClient(object):
self.reset_topic_metadata(topic)
# Errors expected for new topics
- # 3 if topic doesn't exist, or 5 if server is auto-creating
- kafka.common.check_error(topic_metadata)
+ try:
+ kafka.common.check_error(topic_metadata)
+ except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
+
+ # Raise if the topic was passed in explicitly
+ if topic in topics:
+ raise
+
+ # Otherwise, just log a warning
+ log.error("Error loading topic metadata for %s: %s", topic, type(e))
+ continue
self.topic_partitions[topic] = {}
for partition_metadata in partitions:
@@ -300,13 +323,18 @@ class KafkaClient(object):
# Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
+ # Check for partition errors
+ try:
+ kafka.common.check_error(partition_metadata)
+
# If No Leader, topics_to_brokers topic_partition -> None
- if leader == -1:
- log.warning('No leader for topic %s partition %s', topic, partition)
+ except LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
+ continue
# If Known Broker, topic_partition -> BrokerMetadata
- elif leader in self.brokers:
+ if leader in self.brokers:
self.topics_to_brokers[topic_part] = self.brokers[leader]
# If Unknown Broker, fake BrokerMetadata so we dont lose the id
diff --git a/test/test_client.py b/test/test_client.py
index dbc9883..06eec75 100644
--- a/test/test_client.py
+++ b/test/test_client.py
@@ -7,10 +7,15 @@ from kafka.common import (
ProduceRequest, MetadataResponse,
BrokerMetadata, TopicMetadata, PartitionMetadata,
TopicAndPartition, KafkaUnavailableError,
- LeaderNotAvailableError, PartitionUnavailableError
+ LeaderNotAvailableError, PartitionUnavailableError, NoError,
+ UnknownTopicOrPartitionError
)
from kafka.protocol import create_message
+NO_ERROR = 0
+UNKNOWN_TOPIC_OR_PARTITION = 3
+NO_LEADER = 5
+
class TestKafkaClient(unittest2.TestCase):
def test_init_with_list(self):
with patch.object(KafkaClient, 'load_metadata_for_topics'):
@@ -96,7 +101,6 @@ class TestKafkaClient(unittest2.TestCase):
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_load_metadata(self, protocol, conn):
- "Load metadata for all topics"
conn.recv.return_value = 'response' # anything but None
@@ -106,18 +110,21 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_1', 0, [
- PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], 0)
+ TopicMetadata('topic_1', NO_ERROR, [
+ PartitionMetadata('topic_1', 0, 1, [1, 2], [1, 2], NO_ERROR)
]),
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
- TopicMetadata('topic_no_partitions', 0, []),
- TopicMetadata('topic_3', 0, [
- PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], 0),
- PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], 0),
- PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], 0)
+ TopicMetadata('topic_no_partitions', NO_LEADER, []),
+ TopicMetadata('topic_unknown', UNKNOWN_TOPIC_OR_PARTITION, []),
+ TopicMetadata('topic_3', NO_ERROR, [
+ PartitionMetadata('topic_3', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_3', 1, 1, [1, 0], [1, 0], NO_ERROR),
+ PartitionMetadata('topic_3', 2, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -133,6 +140,16 @@ class TestKafkaClient(unittest2.TestCase):
TopicAndPartition('topic_3', 2): brokers[0]},
client.topics_to_brokers)
+ # if we ask for metadata explicitly, it should raise errors
+ with self.assertRaises(LeaderNotAvailableError):
+ client.load_metadata_for_topics('topic_no_partitions')
+
+ with self.assertRaises(UnknownTopicOrPartitionError):
+ client.load_metadata_for_topics('topic_unknown')
+
+ # This should not raise
+ client.load_metadata_for_topics('topic_no_leader')
+
@patch('kafka.client.KafkaConnection')
@patch('kafka.client.KafkaProtocol')
def test_get_leader_for_partitions_reloads_metadata(self, protocol, conn):
@@ -146,7 +163,7 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_no_partitions', 0, [])
+ TopicMetadata('topic_no_partitions', NO_LEADER, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -156,8 +173,8 @@ class TestKafkaClient(unittest2.TestCase):
self.assertDictEqual({}, client.topics_to_brokers)
topics = [
- TopicMetadata('topic_one_partition', 0, [
- PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], 0)
+ TopicMetadata('topic_one_partition', NO_ERROR, [
+ PartitionMetadata('topic_no_partition', 0, 0, [0, 1], [0, 1], NO_ERROR)
])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -184,7 +201,7 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_no_partitions', 0, [])
+ TopicMetadata('topic_no_partitions', NO_ERROR, [])
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -208,9 +225,11 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -226,9 +245,9 @@ class TestKafkaClient(unittest2.TestCase):
self.assertIsNone(client._get_leader_for_partition('topic_noleader', 1))
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], 0),
- PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, 0, [0, 1], [0, 1], NO_ERROR),
+ PartitionMetadata('topic_noleader', 1, 1, [1, 0], [1, 0], NO_ERROR)
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)
@@ -248,9 +267,11 @@ class TestKafkaClient(unittest2.TestCase):
]
topics = [
- TopicMetadata('topic_noleader', 0, [
- PartitionMetadata('topic_noleader', 0, -1, [], [], 0),
- PartitionMetadata('topic_noleader', 1, -1, [], [], 0)
+ TopicMetadata('topic_noleader', NO_ERROR, [
+ PartitionMetadata('topic_noleader', 0, -1, [], [],
+ NO_LEADER),
+ PartitionMetadata('topic_noleader', 1, -1, [], [],
+ NO_LEADER),
]),
]
protocol.decode_metadata_response.return_value = MetadataResponse(brokers, topics)