diff options
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 14 |
1 files changed, 7 insertions, 7 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 4e0b94e..1a4d5ab 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -22,10 +22,10 @@ class ClusterMetadata(object): } def __init__(self, **configs): - self._brokers = {} - self._partitions = {} - self._broker_partitions = collections.defaultdict(set) - self._groups = {} + self._brokers = {} # node_id -> BrokerMetadata + self._partitions = {} # topic -> partition -> PartitionMetadata + self._broker_partitions = collections.defaultdict(set) # node_id -> {TopicPartition...} + self._groups = {} # group_name -> node_id self._last_refresh_ms = 0 self._last_successful_refresh_ms = 0 self._need_update = False @@ -126,8 +126,6 @@ class ClusterMetadata(object): node_id: BrokerMetadata(node_id, host, port) }) - # Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed - # but retain LeaderNotAvailable because it means topic is initializing self._partitions.clear() self._broker_partitions.clear() self.unauthorized_topics.clear() @@ -141,7 +139,9 @@ class ClusterMetadata(object): topic=topic, partition=partition, leader=leader, replicas=replicas, isr=isr, error=p_error) if leader != -1: - self._broker_partitions[leader].add(TopicPartition(topic, partition)) + self._broker_partitions[leader].add( + TopicPartition(topic, partition)) + elif error_type is Errors.LeaderNotAvailableError: log.error("Topic %s is not available during auto-create" " initialization", topic) |