summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 69cc02e..d766fa3 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,7 +9,7 @@ import time
import six
import kafka.common as Errors
-from kafka.common import BrokerMetadata, TopicPartition
+from kafka.common import BrokerMetadata, PartitionMetadata, TopicPartition
from .future import Future
log = logging.getLogger(__name__)
@@ -55,15 +55,17 @@ class ClusterMetadata(object):
"""Return set of partitions with known leaders"""
if topic not in self._partitions:
return None
- return set([partition for partition, leader
+ return set([partition for partition, metadata
in six.iteritems(self._partitions[topic])
- if leader != -1])
+ if metadata.leader != -1])
def leader_for_partition(self, partition):
"""Return node_id of leader, -1 unavailable, None if unknown."""
if partition.topic not in self._partitions:
return None
- return self._partitions[partition.topic].get(partition.partition)
+ elif partition.partition not in self._partitions[partition.topic]:
+ return None
+ return self._partitions[partition.topic][partition.partition].leader
def partitions_for_broker(self, broker_id):
"""Return TopicPartitions for which the broker is a leader"""
@@ -133,8 +135,10 @@ class ClusterMetadata(object):
error_type = Errors.for_code(error_code)
if error_type is Errors.NoError:
self._partitions[topic] = {}
- for _, partition, leader, _, _ in partitions:
- self._partitions[topic][partition] = leader
+ for p_error, partition, leader, replicas, isr in partitions:
+ self._partitions[topic][partition] = PartitionMetadata(
+ topic=topic, partition=partition, leader=leader,
+ replicas=replicas, isr=isr, error=p_error)
if leader != -1:
self._broker_partitions[leader].add(TopicPartition(topic, partition))
elif error_type is Errors.LeaderNotAvailableError: