summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-21 08:19:23 -0700
committerDana Powers <dana.powers@gmail.com>2019-03-21 08:19:23 -0700
commit119378391405cbacdede48364bb50dc91ad1544b (patch)
tree63fa01aaca9c261b8bc6856b29314b5c91c992b3 /kafka/client_async.py
parentee4a53e9e5ae93231d6f7010f263b30a9924dabb (diff)
downloadkafka-python-bootstrap_cluster.tar.gz
Maintain shadow bootstrap cluster metadatabootstrap_cluster
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py35
1 files changed, 10 insertions, 25 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fdf5454..f8a6054 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -245,11 +245,7 @@ class KafkaClient(object):
def _can_connect(self, node_id):
if node_id not in self._conns:
- # cluster.broker_metadata() is stateful when called w/ 'bootstrap'
- # (it cycles through all of the bootstrap servers)
- # so we short-circuit here and assume that we should always have
- # some bootstrap_servers config to power bootstrap broker_metadata
- if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
+ if self.cluster.broker_metadata(node_id):
return True
return False
conn = self._conns[node_id]
@@ -266,7 +262,7 @@ class KafkaClient(object):
except KeyError:
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._last_bootstrap = time.time()
elif conn.connected():
@@ -284,12 +280,13 @@ class KafkaClient(object):
self._idle_expiry_manager.update(node_id)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails = 0
- elif 'bootstrap' in self._conns:
- bootstrap = self._conns.pop('bootstrap')
- bootstrap.close()
+ else:
+ for node_id in list(self._conns.keys()):
+ if self.cluster.is_bootstrap(node_id):
+ self._conns.pop(node_id).close()
# Connection failures imply that our metadata is stale, so let's refresh
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -308,7 +305,7 @@ class KafkaClient(object):
idle_disconnect = True
self._idle_expiry_manager.remove(node_id)
- if node_id == 'bootstrap':
+ if self.cluster.is_bootstrap(node_id):
self._bootstrap_fails += 1
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -331,10 +328,6 @@ class KafkaClient(object):
if not conn.disconnected():
return False
- # Always recycled disconnected bootstraps
- elif conn.node_id == 'bootstrap':
- return True
-
# Otherwise, only recycle when broker metadata has changed
broker = self.cluster.broker_metadata(conn.node_id)
if broker is None:
@@ -355,10 +348,6 @@ class KafkaClient(object):
conn = self._conns.get(node_id)
if conn is None:
- # Note that when bootstrapping, each call to broker_metadata may
- # return a different host/port. So we need to be careful to only
- # call when necessary to avoid skipping some possible bootstrap
- # source.
broker = self.cluster.broker_metadata(node_id)
assert broker, 'Broker id %s not in current metadata' % (node_id,)
@@ -697,7 +686,7 @@ class KafkaClient(object):
in-flight-requests. If no such node is found, a node will be chosen
randomly from disconnected nodes that are not "blacked out" (i.e.,
are not subject to a reconnect backoff). If no node metadata has been
- obtained, will return 'bootstrap' (subject to exponential backoff).
+ obtained, will return a bootstrap node (subject to exponential backoff).
Returns:
node_id or None if no suitable node was found
@@ -724,10 +713,6 @@ class KafkaClient(object):
if found is not None:
return found
- elif not nodes and self._can_bootstrap():
- self._last_bootstrap = time.time()
- return 'bootstrap'
-
return None
def set_topics(self, topics):
@@ -785,7 +770,7 @@ class KafkaClient(object):
if self._can_send_request(node_id):
topics = list(self._topics)
- if not topics and node_id == 'bootstrap':
+ if not topics and self.cluster.is_bootstrap(node_id):
topics = list(self.config['bootstrap_topics_filter'])
if self.cluster.need_all_topic_metadata or not topics: