diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 16:15:08 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 16:15:08 -0800 |
commit | ca7cd0dd1172c337dda1d719b3b5c49a02da97d9 (patch) | |
tree | a2b51f06f7e15ae4ef675961119a5e6c0df7bfe8 /kafka/cluster.py | |
parent | 494800cd004b3547d29bee2d7dc0a7ccf2c3dbe0 (diff) | |
download | kafka-python-ca7cd0dd1172c337dda1d719b3b5c49a02da97d9.tar.gz |
Keep full PartitionMetadata from MetadataResponses
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 69cc02e..d766fa3 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,7 +9,7 @@ import time import six import kafka.common as Errors -from kafka.common import BrokerMetadata, TopicPartition +from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition from .future import Future log = logging.getLogger(__name__) @@ -55,15 +55,17 @@ class ClusterMetadata(object): """Return set of partitions with known leaders""" if topic not in self._partitions: return None - return set([partition for partition, leader + return set([partition for partition, metadata in six.iteritems(self._partitions[topic]) - if leader != -1]) + if metadata.leader != -1]) def leader_for_partition(self, partition): """Return node_id of leader, -1 unavailable, None if unknown.""" if partition.topic not in self._partitions: return None - return self._partitions[partition.topic].get(partition.partition) + elif partition.partition not in self._partitions[partition.topic]: + return None + return self._partitions[partition.topic][partition.partition].leader def partitions_for_broker(self, broker_id): """Return TopicPartitions for which the broker is a leader""" @@ -133,8 +135,10 @@ class ClusterMetadata(object): error_type = Errors.for_code(error_code) if error_type is Errors.NoError: self._partitions[topic] = {} - for _, partition, leader, _, _ in partitions: - self._partitions[topic][partition] = leader + for p_error, partition, leader, replicas, isr in partitions: + self._partitions[topic][partition] = PartitionMetadata( + topic=topic, partition=partition, leader=leader, + replicas=replicas, isr=isr, error=p_error) if leader != -1: self._broker_partitions[leader].add(TopicPartition(topic, partition)) elif error_type is Errors.LeaderNotAvailableError: |