summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py19
1 files changed, 10 insertions, 9 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 3d57ed2..17cfd14 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -70,10 +70,14 @@ class ClusterMetadata(object):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
- while True:
- for host, port, afi in bootstrap_hosts:
- for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
- yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
+ brokers = {}
+ for i, (host, port, _) in enumerate(bootstrap_hosts):
+ node_id = 'bootstrap-%s' % i
+ brokers[node_id] = BrokerMetadata(node_id, host, port, None)
+ return brokers
+
+ def is_bootstrap(self, node_id):
+ return node_id in self._bootstrap_brokers
def brokers(self):
"""Get all BrokerMetadata
@@ -81,7 +85,7 @@ class ClusterMetadata(object):
Returns:
set: {BrokerMetadata, ...}
"""
- return set(self._brokers.values())
+ return set(self._brokers.values()) or set(self._bootstrap_brokers.values())
def broker_metadata(self, broker_id):
"""Get BrokerMetadata
@@ -92,10 +96,7 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
- if broker_id == 'bootstrap':
- return next(self._bootstrap_brokers)
-
- return self._brokers.get(broker_id)
+ return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
def partitions_for_topic(self, topic):
"""Return set of all partitions for topic (whether available or not)