summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 16:38:40 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit90c6520097b21d6f6bd075c97c93c0adbc5484c8 (patch)
treee3f4a5800cdf0c35ef2da677f3ca38c10fb6dd5b /kafka/client.py
parent11fc9bc2e61b34bddbf6d54228709e075b2615a1 (diff)
downloadkafka-python-90c6520097b21d6f6bd075c97c93c0adbc5484c8.tar.gz
load_metadata_for_topics should raise exceptions on explicit topic args
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py52
1 files changed, 40 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 46cd7ce..3b9aba9 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -261,16 +261,30 @@ class KafkaClient(object):
def load_metadata_for_topics(self, *topics):
"""
- Discover brokers and metadata for a set of topics. This function is called
- lazily whenever metadata is unavailable.
+ Fetch broker and topic-partition metadata from the server,
+ and update internal data:
+ broker list, topic/partition list, and topic/parition -> broker map
- If broker does not auto-create topics, expect
- UnknownTopicOrPartitionError for new topics
+ This method should be called after receiving any error
- If broker auto-creates topics, expect
- LeaderNotAvailableError for new topics
+ @param: *topics (optional)
+ If a list of topics is provided, the metadata refresh will be limited
+ to the specified topics only.
+
+ Exceptions:
+ ----------
+ If the broker is configured to not auto-create topics,
+ expect UnknownTopicOrPartitionError for topics that don't exist
+
+ If the broker is configured to auto-create topics,
+ expect LeaderNotAvailableError for new topics
until partitions have been initialized.
- Retry.
+
+ Exceptions *will not* be raised in a full refresh (i.e. no topic list)
+ In this case, error codes will be logged as errors
+
+ Partition-level errors will also not be raised here
+ (a single partition w/o a leader, for example)
"""
resp = self.send_metadata_request(topics)
@@ -287,8 +301,17 @@ class KafkaClient(object):
self.reset_topic_metadata(topic)
# Errors expected for new topics
- # 3 if topic doesn't exist, or 5 if server is auto-creating
- kafka.common.check_error(topic_metadata)
+ try:
+ kafka.common.check_error(topic_metadata)
+ except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e:
+
+ # Raise if the topic was passed in explicitly
+ if topic in topics:
+ raise
+
+ # Otherwise, just log a warning
+ log.error("Error loading topic metadata for %s: %s", topic, type(e))
+ continue
self.topic_partitions[topic] = {}
for partition_metadata in partitions:
@@ -300,13 +323,18 @@ class KafkaClient(object):
# Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
+ # Check for partition errors
+ try:
+ kafka.common.check_error(partition_metadata)
+
# If No Leader, topics_to_brokers topic_partition -> None
- if leader == -1:
- log.warning('No leader for topic %s partition %s', topic, partition)
+ except LeaderNotAvailableError:
+ log.error('No leader for topic %s partition %d', topic, partition)
self.topics_to_brokers[topic_part] = None
+ continue
# If Known Broker, topic_partition -> BrokerMetadata
- elif leader in self.brokers:
+ if leader in self.brokers:
self.topics_to_brokers[topic_part] = self.brokers[leader]
# If Unknown Broker, fake BrokerMetadata so we dont lose the id