diff options
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 8078eb7..3d57ed2 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,6 +9,7 @@ import time from kafka.vendor import six from kafka import errors as Errors +from kafka.conn import collect_hosts, dns_lookup from kafka.future import Future from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition @@ -29,10 +30,17 @@ class ClusterMetadata(object): which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000 + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the client should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. """ DEFAULT_CONFIG = { 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'bootstrap_servers': 'localhost', } def __init__(self, **configs): @@ -42,7 +50,7 @@ class ClusterMetadata(object): self._groups = {} # group_name -> node_id self._last_refresh_ms = 0 self._last_successful_refresh_ms = 0 - self._need_update = False + self._need_update = True self._future = None self._listeners = set() self._lock = threading.Lock() @@ -56,6 +64,17 @@ class ClusterMetadata(object): if key in configs: self.config[key] = configs[key] + self._bootstrap_brokers = self._generate_bootstrap_brokers() + + def _generate_bootstrap_brokers(self): + # collect_hosts does not perform DNS, so we should be fine to re-use + bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) + + while True: + for host, port, afi in bootstrap_hosts: + for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi): + yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None) + def brokers(self): """Get all BrokerMetadata @@ -73,6 +92,9 @@ class ClusterMetadata(object): Returns: BrokerMetadata or None if not found """ + if broker_id == 'bootstrap': + return next(self._bootstrap_brokers) + return self._brokers.get(broker_id) def partitions_for_topic(self, topic): |