diff options
author | Dana Powers <dana.powers@rd.io> | 2015-11-29 11:24:35 -0800 |
---|---|---|
committer | Zack Dever <zack.dever@rd.io> | 2015-12-04 11:25:40 -0800 |
commit | c94cb620292f93a4cd3cfc0bb57c5fa38d95a717 (patch) | |
tree | 706729d0a333b43633a53295993d5562ba981825 | |
parent | ec323bcd0af675a6bd4acc61718a089321abd116 (diff) | |
download | kafka-python-c94cb620292f93a4cd3cfc0bb57c5fa38d95a717.tar.gz |
Add simple Cluster class to manage broker metadata
-rw-r--r-- | kafka/cluster.py | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py new file mode 100644 index 0000000..3cd0a3c --- /dev/null +++ b/kafka/cluster.py @@ -0,0 +1,91 @@ +import logging +import random + +from .conn import BrokerConnection, collect_hosts +from .protocol.metadata import MetadataRequest + +logger = logging.getLogger(__name__) + + +class Cluster(object): + def __init__(self, **kwargs): + if 'bootstrap_servers' not in kwargs: + kargs['bootstrap_servers'] = 'localhost' + + self._brokers = {} + self._topics = {} + self._groups = {} + + self._bootstrap(collect_hosts(kwargs['bootstrap_servers']), + timeout=kwargs.get('bootstrap_timeout', 2)) + + def brokers(self): + brokers = list(self._brokers.values()) + return random.sample(brokers, len(brokers)) + + def random_broker(self): + for broker in self.brokers(): + if broker.connected() or broker.connect(): + return broker + return None + + def broker_by_id(self, broker_id): + return self._brokers.get(broker_id) + + def topics(self): + return list(self._topics.keys()) + + def partitions_for_topic(self, topic): + if topic not in self._topics: + return None + return list(self._topics[topic].keys()) + + def broker_for_partition(self, topic, partition): + if topic not in self._topics or partition not in self._topics[topic]: + return None + broker_id = self._topics[topic][partition] + return self.broker_by_id(broker_id) + + def refresh_metadata(self): + broker = self.random_broker() + if not broker.send(MetadataRequest([])): + return None + metadata = broker.recv() + if not metadata: + return None + self._update_metadata(metadata) + return metadata + + def _update_metadata(self, metadata): + self._brokers.update({ + node_id: BrokerConnection(host, port) + for node_id, host, port in metadata.brokers + if node_id not in self._brokers + }) + + self._topics = { + topic: { + partition: leader + for _, partition, leader, _, _ in partitions + } + for _, topic, partitions in metadata.topics + } + + def _bootstrap(self, hosts, timeout=2): + for host, port in hosts: + conn = BrokerConnection(host, port, timeout) + if not conn.connect(): + continue + self._brokers['bootstrap'] = conn + if self.refresh_metadata(): + break + else: + raise ValueError("Could not bootstrap kafka cluster from %s" % hosts) + + if len(self._brokers) > 1: + self._brokers.pop('bootstrap') + conn.close() + + def __str__(self): + return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \ + (len(self._brokers), len(self._topics), len(self._groups)) |