summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2014-09-01 02:29:43 -0700
committerDana Powers <dana.powers@rd.io>2014-09-01 18:04:10 -0700
commit3bfe593e2fc47c4ab4b90edb07d205ed07489322 (patch)
tree17c220e74d6017ab2850bc6839bb10a89eac9449 /kafka/client.py
parentbebe7b663894c96d407b3b65725c8779c3b3af4d (diff)
downloadkafka-python-3bfe593e2fc47c4ab4b90edb07d205ed07489322.tar.gz
Refactor internal metadata dicts in KafkaClient
- use helper methods not direct access - add get_partition_ids_for_topic - check for topic and partition errors during load_metadata_for_topics - raise LeaderNotAvailableError when topic is being auto-created or UnknownTopicOrPartitionError if auto-creation off
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py76
1 files changed, 56 insertions, 20 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 2eab1e3..46cd7ce 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -36,8 +36,9 @@ class KafkaClient(object):
# create connections only when we need them
self.conns = {}
self.brokers = {} # broker_id -> BrokerMetadata
- self.topics_to_brokers = {} # topic_id -> broker_id
- self.topic_partitions = {} # topic_id -> [0, 1, 2, ...]
+ self.topics_to_brokers = {} # TopicAndPartition -> BrokerMetadata
+ self.topic_partitions = {} # topic -> partition -> PartitionMetadata
+
self.load_metadata_for_topics() # bootstrap with all metadata
@@ -235,50 +236,85 @@ class KafkaClient(object):
self.topic_partitions.clear()
def has_metadata_for_topic(self, topic):
- return topic in self.topic_partitions
+ return (
+ topic in self.topic_partitions
+ and len(self.topic_partitions[topic]) > 0
+ )
+
+ def get_partition_ids_for_topic(self, topic):
+ if topic not in self.topic_partitions:
+ return None
+
+ return self.topic_partitions[topic].keys()
def ensure_topic_exists(self, topic, timeout = 30):
start_time = time.time()
- self.load_metadata_for_topics(topic)
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError("Unable to create topic {0}".format(topic))
- self.load_metadata_for_topics(topic)
+ try:
+ self.load_metadata_for_topics(topic)
+ except LeaderNotAvailableError:
+ pass
time.sleep(.5)
def load_metadata_for_topics(self, *topics):
"""
Discover brokers and metadata for a set of topics. This function is called
lazily whenever metadata is unavailable.
- """
+ If broker does not auto-create topics, expect
+ UnknownTopicOrPartitionError for new topics
+
+ If broker auto-creates topics, expect
+ LeaderNotAvailableError for new topics
+ until partitions have been initialized.
+ Retry.
+ """
resp = self.send_metadata_request(topics)
- brokers = dict([(broker.nodeId, broker) for broker in resp.brokers])
- topics = dict([(t.topic, dict([(p.partition, p) for p in t.partitions]) ) for t in resp.topics])
+ log.debug("Broker metadata: %s", resp.brokers)
+ log.debug("Topic metadata: %s", resp.topics)
- log.debug("Broker metadata: %s", brokers)
- log.debug("Topic metadata: %s", topics)
+ self.brokers = dict([(broker.nodeId, broker)
+ for broker in resp.brokers])
- self.brokers = brokers
+ for topic_metadata in resp.topics:
+ topic = topic_metadata.topic
+ partitions = topic_metadata.partitions
- for topic, partitions in topics.items():
self.reset_topic_metadata(topic)
- if not partitions:
- log.warning('No partitions for %s', topic)
- continue
+ # Errors expected for new topics
+ # 3 if topic doesn't exist, or 5 if server is auto-creating
+ kafka.common.check_error(topic_metadata)
+
+ self.topic_partitions[topic] = {}
+ for partition_metadata in partitions:
+ partition = partition_metadata.partition
+ leader = partition_metadata.leader
+
+ self.topic_partitions[topic][partition] = partition_metadata
- self.topic_partitions[topic] = []
- for partition, meta in partitions.items():
- self.topic_partitions[topic].append(partition)
+ # Populate topics_to_brokers dict
topic_part = TopicAndPartition(topic, partition)
- if meta.leader == -1:
+
+ # If No Leader, topics_to_brokers topic_partition -> None
+ if leader == -1:
log.warning('No leader for topic %s partition %s', topic, partition)
self.topics_to_brokers[topic_part] = None
+
+ # If Known Broker, topic_partition -> BrokerMetadata
+ elif leader in self.brokers:
+ self.topics_to_brokers[topic_part] = self.brokers[leader]
+
+ # If Unknown Broker, fake BrokerMetadata so we dont lose the id
+ # (not sure how this could happen. server could be in bad state)
else:
- self.topics_to_brokers[topic_part] = brokers[meta.leader]
+ self.topics_to_brokers[topic_part] = BrokerMetadata(
+ leader, None, None
+ )
def send_metadata_request(self, payloads=[], fail_on_error=True,
callback=None):