summaryrefslogtreecommitdiff
path: root/kafka
diff options
context:
space:
mode:
Diffstat (limited to 'kafka')
-rw-r--r--kafka/client_async.py35
-rw-r--r--kafka/cluster.py19
2 files changed, 20 insertions, 34 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fdf5454..f8a6054 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -245,11 +245,7 @@ class KafkaClient(object):
def _can_connect(self, node_id):
if node_id not in self._conns:
- # cluster.broker_metadata() is stateful when called w/ 'bootstrap'
- # (it cycles through all of the bootstrap servers)
- # so we short-circuit here and assume that we should always have
- # some bootstrap_servers config to power bootstrap broker_metadata
- if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
+ if self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
@@ -266,7 +262,7 @@ class KafkaClient(object):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
elif conn.connected():
@@ -284,12 +280,13 @@ class KafkaClient(object):
self._idle_expiry_manager.update(node_id)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails = 0
- elif 'bootstrap' in self._conns:
- bootstrap = self._conns.pop('bootstrap')
- bootstrap.close()
+ else:
+ for node_id in list(self._conns.keys()):
+ if self.cluster.is_bootstrap(node_id):
+ self._conns.pop(node_id).close()
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -308,7 +305,7 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails += 1
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -331,10 +328,6 @@ class KafkaClient(object):
if not conn.disconnected():
return False
- # Always recycled disconnected bootstraps
- elif conn.node_id == 'bootstrap':
- return True
-
# Otherwise, only recycle when broker metadata has changed
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
@@ -355,10 +348,6 @@ class KafkaClient(object):
conn = self._conns.get(node_id)
if conn is None:
- # Note that when bootstrapping, each call to broker_metadata may
- # return a different host/port. So we need to be careful to only
- # call when necessary to avoid skipping some possible bootstrap
- # source.
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
@@ -697,7 +686,7 @@ class KafkaClient(object):
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
are not subject to a reconnect backoff). If no node metadata has been
- obtained, will return 'bootstrap' (subject to exponential backoff).
+ obtained, will return a bootstrap node (subject to exponential backoff).
Returns:
node_id or None if no suitable node was found
@@ -724,10 +713,6 @@ class KafkaClient(object):
if found is not None:
return found
- elif not nodes and self._can_bootstrap():
- self._last_bootstrap = time.time()
- return 'bootstrap'
-
return None
def set_topics(self, topics):
@@ -785,7 +770,7 @@ class KafkaClient(object):
if self._can_send_request(node_id):
topics = list(self._topics)
- if not topics and node_id == 'bootstrap':
+ if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])
if self.cluster.need_all_topic_metadata or not topics:
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 3d57ed2..17cfd14 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -70,10 +70,14 @@ class ClusterMetadata(object):
# collect_hosts does not perform DNS, so we should be fine to re-use
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
- while True:
- for host, port, afi in bootstrap_hosts:
- for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
- yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
+ brokers = {}
+ for i, (host, port, _) in enumerate(bootstrap_hosts):
+ node_id = 'bootstrap-%s' % i
+ brokers[node_id] = BrokerMetadata(node_id, host, port, None)
+ return brokers
+
+ def is_bootstrap(self, node_id):
+ return node_id in self._bootstrap_brokers
def brokers(self):
"""Get all BrokerMetadata
@@ -81,7 +85,7 @@ class ClusterMetadata(object):
Returns:
set: {BrokerMetadata, ...}
"""
- return set(self._brokers.values())
+ return set(self._brokers.values()) or set(self._bootstrap_brokers.values())
def broker_metadata(self, broker_id):
"""Get BrokerMetadata
@@ -92,10 +96,7 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
- if broker_id == 'bootstrap':
- return next(self._bootstrap_brokers)
-
- return self._brokers.get(broker_id)
+ return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
def partitions_for_topic(self, topic):
"""Return set of all partitions for topic (whether available or not)