summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2015-04-12 22:22:19 -0700
committerDana Powers <dana.powers@gmail.com>2015-04-12 22:22:19 -0700
commitcd81cf0ec8c1b7e7651374c5d1cbd105d003d352 (patch)
tree0d59747dc46a22b5b14fd958965ff14b6d596e12 /kafka/client.py
parent19643fb008b5b9153873bf6f8a4cdd5b54964cbe (diff)
parent6326e18cd89d55fc8e83d313f365ddafea272601 (diff)
downloadkafka-python-cd81cf0ec8c1b7e7651374c5d1cbd105d003d352.tar.gz
Merge pull request #367 from dpkp/clean_metadata_refresh
Clear local metadata cache before refresh in client.load_metadata_for_topics()
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py27
1 files changed, 14 insertions, 13 deletions
diff --git a/kafka/client.py b/kafka/client.py
index 9eb8a0d..6ef9d83 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -267,15 +267,11 @@ class KafkaClient(object):
def reset_topic_metadata(self, *topics):
for topic in topics:
- try:
- partitions = self.topic_partitions[topic]
- except KeyError:
- continue
-
- for partition in partitions:
- self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None)
-
- del self.topic_partitions[topic]
+ for topic_partition in list(self.topics_to_brokers.keys()):
+ if topic_partition.topic == topic:
+ del self.topics_to_brokers[topic_partition]
+ if topic in self.topic_partitions:
+ del self.topic_partitions[topic]
def reset_all_metadata(self):
self.topics_to_brokers.clear()
@@ -339,10 +335,17 @@ class KafkaClient(object):
(a single partition w/o a leader, for example)
"""
topics = [kafka_bytestring(t) for t in topics]
+
+ if topics:
+ for topic in topics:
+ self.reset_topic_metadata(topic)
+ else:
+ self.reset_all_metadata()
+
resp = self.send_metadata_request(topics)
- log.debug("Broker metadata: %s", resp.brokers)
- log.debug("Topic metadata: %s", resp.topics)
+ log.debug("Received new broker metadata: %s", resp.brokers)
+ log.debug("Received new topic metadata: %s", resp.topics)
self.brokers = dict([(broker.nodeId, broker)
for broker in resp.brokers])
@@ -351,8 +354,6 @@ class KafkaClient(object):
topic = topic_metadata.topic
partitions = topic_metadata.partitions
- self.reset_topic_metadata(topic)
-
# Errors expected for new topics
try:
kafka.common.check_error(topic_metadata)