diff options
author | Dana Powers <dana.powers@rd.io> | 2016-01-23 15:07:23 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@rd.io> | 2016-01-23 15:07:23 -0800 |
commit | 318b10744cdec9bab268ed8e532fd76246cbb0df (patch) | |
tree | 233ebeed073c7c690d58d948912ab75160963794 /kafka/cluster.py | |
parent | d2012e067c953c80406c94f98d7a69d56a543f6c (diff) | |
download | kafka-python-318b10744cdec9bab268ed8e532fd76246cbb0df.tar.gz |
Add available_partitions_for_topic() and partitions_for_broker()
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 25 |
1 files changed, 23 insertions, 2 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 1cdc8dd..863b0c2 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -1,12 +1,15 @@ from __future__ import absolute_import +import collections import copy import logging import random import time +import six + import kafka.common as Errors -from kafka.common import BrokerMetadata +from kafka.common import BrokerMetadata, TopicPartition from .future import Future log = logging.getLogger(__name__) @@ -21,6 +24,7 @@ class ClusterMetadata(object): def __init__(self, **configs): self._brokers = {} self._partitions = {} + self._broker_partitions = collections.defaultdict(set) self._groups = {} self._version = 0 self._last_refresh_ms = 0 @@ -41,15 +45,29 @@ class ClusterMetadata(object): return self._brokers.get(broker_id) def partitions_for_topic(self, topic): + """Return set of all partitions for topic (whether available or not)""" if topic not in self._partitions: return None return set(self._partitions[topic].keys()) + def available_partitions_for_topic(self, topic): + """Return set of partitions with known leaders""" + if topic not in self._partitions: + return None + return set([partition for partition, leader + in six.iteritems(self._partitions[topic]) + if leader != -1]) + def leader_for_partition(self, partition): + """Return node_id of leader, -1 unavailable, None if unknown.""" if partition.topic not in self._partitions: return None return self._partitions[partition.topic].get(partition.partition) + def partitions_for_broker(self, broker_id): + """Return TopicPartitions for which the broker is a leader""" + return self._broker_partitions.get(broker_id) + def coordinator_for_group(self, group): return self._groups.get(group) @@ -106,7 +124,8 @@ class ClusterMetadata(object): # Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed # but retain LeaderNotAvailable because it means topic is initializing - self._partitions = {} + self._partitions.clear() + self._broker_partitions.clear() for error_code, topic, partitions in metadata.topics: error_type = Errors.for_code(error_code) @@ -114,6 +133,8 @@ class ClusterMetadata(object): self._partitions[topic] = {} for _, partition, leader, _, _ in partitions: self._partitions[topic][partition] = leader + if leader != -1: + self._broker_partitions[leader].add(TopicPartition(topic, partition)) elif error_type is Errors.LeaderNotAvailableError: log.error("Topic %s is not available during auto-create" " initialization", topic) |