diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 27 |
1 files changed, 14 insertions, 13 deletions
diff --git a/kafka/client.py b/kafka/client.py index 9eb8a0d..6ef9d83 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -267,15 +267,11 @@ class KafkaClient(object): def reset_topic_metadata(self, *topics): for topic in topics: - try: - partitions = self.topic_partitions[topic] - except KeyError: - continue - - for partition in partitions: - self.topics_to_brokers.pop(TopicAndPartition(topic, partition), None) - - del self.topic_partitions[topic] + for topic_partition in list(self.topics_to_brokers.keys()): + if topic_partition.topic == topic: + del self.topics_to_brokers[topic_partition] + if topic in self.topic_partitions: + del self.topic_partitions[topic] def reset_all_metadata(self): self.topics_to_brokers.clear() @@ -339,10 +335,17 @@ class KafkaClient(object): (a single partition w/o a leader, for example) """ topics = [kafka_bytestring(t) for t in topics] + + if topics: + for topic in topics: + self.reset_topic_metadata(topic) + else: + self.reset_all_metadata() + resp = self.send_metadata_request(topics) - log.debug("Broker metadata: %s", resp.brokers) - log.debug("Topic metadata: %s", resp.topics) + log.debug("Received new broker metadata: %s", resp.brokers) + log.debug("Received new topic metadata: %s", resp.topics) self.brokers = dict([(broker.nodeId, broker) for broker in resp.brokers]) @@ -351,8 +354,6 @@ class KafkaClient(object): topic = topic_metadata.topic partitions = topic_metadata.partitions - self.reset_topic_metadata(topic) - # Errors expected for new topics try: kafka.common.check_error(topic_metadata) |