summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py25
1 files changed, 23 insertions, 2 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 1cdc8dd..863b0c2 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -1,12 +1,15 @@
from __future__ import absolute_import
+import collections
import copy
import logging
import random
import time
+import six
+
import kafka.common as Errors
-from kafka.common import BrokerMetadata
+from kafka.common import BrokerMetadata, TopicPartition
from .future import Future
log = logging.getLogger(__name__)
@@ -21,6 +24,7 @@ class ClusterMetadata(object):
def __init__(self, **configs):
self._brokers = {}
self._partitions = {}
+ self._broker_partitions = collections.defaultdict(set)
self._groups = {}
self._version = 0
self._last_refresh_ms = 0
@@ -41,15 +45,29 @@ class ClusterMetadata(object):
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):
+ """Return set of all partitions for topic (whether available or not)"""
if topic not in self._partitions:
return None
return set(self._partitions[topic].keys())
+ def available_partitions_for_topic(self, topic):
+ """Return set of partitions with known leaders"""
+ if topic not in self._partitions:
+ return None
+ return set([partition for partition, leader
+ in six.iteritems(self._partitions[topic])
+ if leader != -1])
+
def leader_for_partition(self, partition):
+ """Return node_id of leader, -1 unavailable, None if unknown."""
if partition.topic not in self._partitions:
return None
return self._partitions[partition.topic].get(partition.partition)
+ def partitions_for_broker(self, broker_id):
+ """Return TopicPartitions for which the broker is a leader"""
+ return self._broker_partitions.get(broker_id)
+
def coordinator_for_group(self, group):
return self._groups.get(group)
@@ -106,7 +124,8 @@ class ClusterMetadata(object):
# Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed
# but retain LeaderNotAvailable because it means topic is initializing
- self._partitions = {}
+ self._partitions.clear()
+ self._broker_partitions.clear()
for error_code, topic, partitions in metadata.topics:
error_type = Errors.for_code(error_code)
@@ -114,6 +133,8 @@ class ClusterMetadata(object):
self._partitions[topic] = {}
for _, partition, leader, _, _ in partitions:
self._partitions[topic][partition] = leader
+ if leader != -1:
+ self._broker_partitions[leader].add(TopicPartition(topic, partition))
elif error_type is Errors.LeaderNotAvailableError:
log.error("Topic %s is not available during auto-create"
" initialization", topic)