diff options
author | Dana Powers <dana.powers@rd.io> | 2014-09-01 02:29:43 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2014-09-01 18:04:10 -0700 |
commit | 3bfe593e2fc47c4ab4b90edb07d205ed07489322 (patch) | |
tree | 17c220e74d6017ab2850bc6839bb10a89eac9449 /kafka/client.py | |
parent | bebe7b663894c96d407b3b65725c8779c3b3af4d (diff) | |
download | kafka-python-3bfe593e2fc47c4ab4b90edb07d205ed07489322.tar.gz |
Refactor internal metadata dicts in KafkaClient
- use helper methods not direct access
- add get_partition_ids_for_topic
- check for topic and partition errors during load_metadata_for_topics
- raise LeaderNotAvailableError when topic is being auto-created
or UnknownTopicOrPartitionError if auto-creation off
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 76 |
1 files changed, 56 insertions, 20 deletions
diff --git a/kafka/client.py b/kafka/client.py index 2eab1e3..46cd7ce 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -36,8 +36,9 @@ class KafkaClient(object): # create connections only when we need them self.conns = {} self.brokers = {} # broker_id -> BrokerMetadata - self.topics_to_brokers = {} # topic_id -> broker_id - self.topic_partitions = {} # topic_id -> [0, 1, 2, ...] + self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata + self.topic_partitions = {} # topic -> partition -> PartitionMetadata + self.load_metadata_for_topics() # bootstrap with all metadata @@ -235,50 +236,85 @@ class KafkaClient(object): self.topic_partitions.clear() def has_metadata_for_topic(self, topic): - return topic in self.topic_partitions + return ( + topic in self.topic_partitions + and len(self.topic_partitions[topic]) > 0 + ) + + def get_partition_ids_for_topic(self, topic): + if topic not in self.topic_partitions: + return None + + return self.topic_partitions[topic].keys() def ensure_topic_exists(self, topic, timeout = 30): start_time = time.time() - self.load_metadata_for_topics(topic) while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: raise KafkaTimeoutError("Unable to create topic {0}".format(topic)) - self.load_metadata_for_topics(topic) + try: + self.load_metadata_for_topics(topic) + except LeaderNotAvailableError: + pass time.sleep(.5) def load_metadata_for_topics(self, *topics): """ Discover brokers and metadata for a set of topics. This function is called lazily whenever metadata is unavailable. - """ + If broker does not auto-create topics, expect + UnknownTopicOrPartitionError for new topics + + If broker auto-creates topics, expect + LeaderNotAvailableError for new topics + until partitions have been initialized. + Retry. + """ resp = self.send_metadata_request(topics) - brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) - topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics]) + log.debug("Broker metadata: %s", resp.brokers) + log.debug("Topic metadata: %s", resp.topics) - log.debug("Broker metadata: %s", brokers) - log.debug("Topic metadata: %s", topics) + self.brokers = dict([(broker.nodeId, broker) + for broker in resp.brokers]) - self.brokers = brokers + for topic_metadata in resp.topics: + topic = topic_metadata.topic + partitions = topic_metadata.partitions - for topic, partitions in topics.items(): self.reset_topic_metadata(topic) - if not partitions: - log.warning('No partitions for %s', topic) - continue + # Errors expected for new topics + # 3 if topic doesn't exist, or 5 if server is auto-creating + kafka.common.check_error(topic_metadata) + + self.topic_partitions[topic] = {} + for partition_metadata in partitions: + partition = partition_metadata.partition + leader = partition_metadata.leader + + self.topic_partitions[topic][partition] = partition_metadata - self.topic_partitions[topic] = [] - for partition, meta in partitions.items(): - self.topic_partitions[topic].append(partition) + # Populate topics_to_brokers dict topic_part = TopicAndPartition(topic, partition) - if meta.leader == -1: + + # If No Leader, topics_to_brokers topic_partition -> None + if leader == -1: log.warning('No leader for topic %s partition %s', topic, partition) self.topics_to_brokers[topic_part] = None + + # If Known Broker, topic_partition -> BrokerMetadata + elif leader in self.brokers: + self.topics_to_brokers[topic_part] = self.brokers[leader] + + # If Unknown Broker, fake BrokerMetadata so we dont lose the id + # (not sure how this could happen. server could be in bad state) else: - self.topics_to_brokers[topic_part] = brokers[meta.leader] + self.topics_to_brokers[topic_part] = BrokerMetadata( + leader, None, None + ) def send_metadata_request(self, payloads=[], fail_on_error=True, callback=None): |