diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 16:38:40 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:04:10 -0700 |
commit | 90c6520097b21d6f6bd075c97c93c0adbc5484c8 (patch) | |
tree | e3f4a5800cdf0c35ef2da677f3ca38c10fb6dd5b /kafka/client.py | |
parent | 11fc9bc2e61b34bddbf6d54228709e075b2615a1 (diff) | |
download | kafka-python-90c6520097b21d6f6bd075c97c93c0adbc5484c8.tar.gz |
load_metadata_for_topics should raise exceptions on explicit topic args
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 52 |
1 files changed, 40 insertions, 12 deletions
diff --git a/kafka/client.py b/kafka/client.py index 46cd7ce..3b9aba9 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -261,16 +261,30 @@ class KafkaClient(object): def load_metadata_for_topics(self, *topics): """ - Discover brokers and metadata for a set of topics. This function is called - lazily whenever metadata is unavailable. + Fetch broker and topic-partition metadata from the server, + and update internal data: + broker list, topic/partition list, and topic/parition -> broker map - If broker does not auto-create topics, expect - UnknownTopicOrPartitionError for new topics + This method should be called after receiving any error - If broker auto-creates topics, expect - LeaderNotAvailableError for new topics + @param: *topics (optional) + If a list of topics is provided, the metadata refresh will be limited + to the specified topics only. + + Exceptions: + ---------- + If the broker is configured to not auto-create topics, + expect UnknownTopicOrPartitionError for topics that don't exist + + If the broker is configured to auto-create topics, + expect LeaderNotAvailableError for new topics until partitions have been initialized. - Retry. + + Exceptions *will not* be raised in a full refresh (i.e. no topic list) + In this case, error codes will be logged as errors + + Partition-level errors will also not be raised here + (a single partition w/o a leader, for example) """ resp = self.send_metadata_request(topics) @@ -287,8 +301,17 @@ class KafkaClient(object): self.reset_topic_metadata(topic) # Errors expected for new topics - # 3 if topic doesn't exist, or 5 if server is auto-creating - kafka.common.check_error(topic_metadata) + try: + kafka.common.check_error(topic_metadata) + except (UnknownTopicOrPartitionError, LeaderNotAvailableError) as e: + + # Raise if the topic was passed in explicitly + if topic in topics: + raise + + # Otherwise, just log a warning + log.error("Error loading topic metadata for %s: %s", topic, type(e)) + continue self.topic_partitions[topic] = {} for partition_metadata in partitions: @@ -300,13 +323,18 @@ class KafkaClient(object): # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) + # Check for partition errors + try: + kafka.common.check_error(partition_metadata) + # If No Leader, topics_to_brokers topic_partition -> None - if leader == -1: - log.warning('No leader for topic %s partition %s', topic, partition) + except LeaderNotAvailableError: + log.error('No leader for topic %s partition %d', topic, partition) self.topics_to_brokers[topic_part] = None + continue # If Known Broker, topic_partition -> BrokerMetadata - elif leader in self.brokers: + if leader in self.brokers: self.topics_to_brokers[topic_part] = self.brokers[leader] # If Unknown Broker, fake BrokerMetadata so we dont lose the id |