diff options
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 187 |
1 files changed, 121 insertions, 66 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 15921dc..2e9e117 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -1,91 +1,146 @@ +from __future__ import absolute_import + import logging import random +import time -from .conn import BrokerConnection, collect_hosts -from .protocol.metadata import MetadataRequest +import kafka.common as Errors +from kafka.common import BrokerMetadata +from .future import Future -logger = logging.getLogger(__name__) +log = logging.getLogger(__name__) -class Cluster(object): - def __init__(self, **kwargs): - if 'bootstrap_servers' not in kwargs: - kwargs['bootstrap_servers'] = 'localhost' +class ClusterMetadata(object): + _retry_backoff_ms = 100 + _metadata_max_age_ms = 300000 + def __init__(self, **kwargs): self._brokers = {} - self._topics = {} + self._partitions = {} self._groups = {} + self._version = 0 + self._last_refresh_ms = 0 + self._last_successful_refresh_ms = 0 + self._need_update = False + self._future = None + self._listeners = set() - self._bootstrap(collect_hosts(kwargs['bootstrap_servers']), - timeout=kwargs.get('bootstrap_timeout', 2)) + for config in ('retry_backoff_ms', 'metadata_max_age_ms'): + if config in kwargs: + setattr(self, '_' + config, kwargs.pop(config)) def brokers(self): - brokers = list(self._brokers.values()) - return random.sample(brokers, len(brokers)) + return set(self._brokers.values()) - def random_broker(self): - for broker in self.brokers(): - if broker.connected() or broker.connect(): - return broker - return None - - def broker_by_id(self, broker_id): + def broker_metadata(self, broker_id): return self._brokers.get(broker_id) - def topics(self): - return list(self._topics.keys()) - def partitions_for_topic(self, topic): - if topic not in self._topics: + if topic not in self._partitions: return None - return list(self._topics[topic].keys()) + return set(self._partitions[topic].keys()) - def broker_for_partition(self, topic, partition): - if topic not in self._topics or partition not in self._topics[topic]: + def leader_for_partition(self, partition): + if partition.topic not in self._partitions: return None - broker_id = self._topics[topic][partition] - return self.broker_by_id(broker_id) + return self._partitions[partition.topic].get(partition.partition) - def refresh_metadata(self): - broker = self.random_broker() - if not broker.send(MetadataRequest([])): - return None - metadata = broker.recv() - if not metadata: - return None - self._update_metadata(metadata) - return metadata - - def _update_metadata(self, metadata): - self._brokers.update({ - node_id: BrokerConnection(host, port) - for node_id, host, port in metadata.brokers - if node_id not in self._brokers - }) - - self._topics = { - topic: { - partition: leader - for _, partition, leader, _, _ in partitions - } - for _, topic, partitions in metadata.topics - } - - def _bootstrap(self, hosts, timeout=2): - for host, port in hosts: - conn = BrokerConnection(host, port) - if not conn.connect(): - continue - self._brokers['bootstrap'] = conn - if self.refresh_metadata(): - break + def coordinator_for_group(self, group): + return self._groups.get(group) + + def ttl(self): + """Milliseconds until metadata should be refreshed""" + now = time.time() * 1000 + if self._need_update: + ttl = 0 else: - raise ValueError("Could not bootstrap kafka cluster from %s" % hosts) + ttl = self._last_successful_refresh_ms + self._metadata_max_age_ms - now + retry = self._last_refresh_ms + self._retry_backoff_ms - now + return max(ttl, retry, 0) + + def request_update(self): + """ + Flags metadata for update, return Future() + + Actual update must be handled separately. This method will only + change the reported ttl() + """ + self._need_update = True + if not self._future or self._future.is_done: + self._future = Future() + return self._future - if len(self._brokers) > 1: - self._brokers.pop('bootstrap') - conn.close() + def topics(self): + return set(self._partitions.keys()) + + def failed_update(self, exception): + if self._future: + self._future.failure(exception) + self._future = None + self._last_refresh_ms = time.time() * 1000 + + def update_metadata(self, metadata): + # In the common case where we ask for a single topic and get back an + # error, we should fail the future + if len(metadata.topics) == 1 and metadata.topics[0][0] != 0: + error_code, topic, _ = metadata.topics[0] + error = Errors.for_code(error_code)(topic) + return self.failed_update(error) + + if not metadata.brokers: + log.warning("No broker metadata found in MetadataResponse") + + for node_id, host, port in metadata.brokers: + self._brokers.update({ + node_id: BrokerMetadata(node_id, host, port) + }) + + # Drop any UnknownTopic, InvalidTopic, and TopicAuthorizationFailed + # but retain LeaderNotAvailable because it means topic is initializing + self._partitions = {} + + for error_code, topic, partitions in metadata.topics: + error_type = Errors.for_code(error_code) + if error_type is Errors.NoError: + self._partitions[topic] = {} + for _, partition, leader, _, _ in partitions: + self._partitions[topic][partition] = leader + elif error_type is Errors.LeaderNotAvailableError: + log.error("Topic %s is not available during auto-create" + " initialization", topic) + elif error_type is Errors.UnknownTopicOrPartitionError: + log.error("Topic %s not found in cluster metadata", topic) + elif error_type is Errors.TopicAuthorizationFailedError: + log.error("Topic %s is not authorized for this client", topic) + elif error_type is Errors.InvalidTopicError: + log.error("'%s' is not a valid topic name", topic) + else: + log.error("Error fetching metadata for topic %s: %s", + topic, error_type) + + if self._future: + self._future.success(self) + self._future = None + self._need_update = False + self._version += 1 + now = time.time() * 1000 + self._last_refresh_ms = now + self._last_successful_refresh_ms = now + log.debug("Updated cluster metadata version %d to %s", + self._version, self) + + for listener in self._listeners: + listener(self) + + def add_listener(self, listener): + """Add a callback function to be called on each metadata update""" + self._listeners.add(listener) + + def remove_listener(self, listener): + """Remove a previously added listener callback""" + self._listeners.remove(listener) def __str__(self): return 'Cluster(brokers: %d, topics: %d, groups: %d)' % \ - (len(self._brokers), len(self._topics), len(self._groups)) + (len(self._brokers), len(self._partitions), len(self._groups)) |