diff options
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 69cc02e..d766fa3 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,7 +9,7 @@ import time import six import kafka.common as Errors -from kafka.common import BrokerMetadata, TopicPartition +from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition from .future import Future log = logging.getLogger(__name__) @@ -55,15 +55,17 @@ class ClusterMetadata(object): """Return set of partitions with known leaders""" if topic not in self._partitions: return None - return set([partition for partition, leader + return set([partition for partition, metadata in six.iteritems(self._partitions[topic]) - if leader != -1]) + if metadata.leader != -1]) def leader_for_partition(self, partition): """Return node_id of leader, -1 unavailable, None if unknown.""" if partition.topic not in self._partitions: return None - return self._partitions[partition.topic].get(partition.partition) + elif partition.partition not in self._partitions[partition.topic]: + return None + return self._partitions[partition.topic][partition.partition].leader def partitions_for_broker(self, broker_id): """Return TopicPartitions for which the broker is a leader""" @@ -133,8 +135,10 @@ class ClusterMetadata(object): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: self._partitions[topic] = {} - for _, partition, leader, _, _ in partitions: - self._partitions[topic][partition] = leader + for p_error, partition, leader, replicas, isr in partitions: + self._partitions[topic][partition] = PartitionMetadata( + topic=topic, partition=partition, leader=leader, + replicas=replicas, isr=isr, error=p_error) if leader != -1: self._broker_partitions[leader].add(TopicPartition(topic, partition)) elif error_type is Errors.LeaderNotAvailableError: |