summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 8078eb7..3d57ed2 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,6 +9,7 @@ import time
from kafka.vendor import six
from kafka import errors as Errors
+from kafka.conn import collect_hosts, dns_lookup
from kafka.future import Future
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition
@@ -29,10 +30,17 @@ class ClusterMetadata(object):
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the client should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
"""
DEFAULT_CONFIG = {
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'bootstrap_servers': 'localhost',
}
def __init__(self, **configs):
@@ -42,7 +50,7 @@ class ClusterMetadata(object):
self._groups = {} # group_name -> node_id
self._last_refresh_ms = 0
self._last_successful_refresh_ms = 0
- self._need_update = False
+ self._need_update = True
self._future = None
self._listeners = set()
self._lock = threading.Lock()
@@ -56,6 +64,17 @@ class ClusterMetadata(object):
if key in configs:
self.config[key] = configs[key]
+ self._bootstrap_brokers = self._generate_bootstrap_brokers()
+
+ def _generate_bootstrap_brokers(self):
+ # collect_hosts does not perform DNS, so we should be fine to re-use
+ bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
+
+ while True:
+ for host, port, afi in bootstrap_hosts:
+ for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
+ yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
+
def brokers(self):
"""Get all BrokerMetadata
@@ -73,6 +92,9 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
+ if broker_id == 'bootstrap':
+ return next(self._bootstrap_brokers)
+
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):