diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-21 08:19:23 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-21 08:19:23 -0700 |
commit | 119378391405cbacdede48364bb50dc91ad1544b (patch) | |
tree | 63fa01aaca9c261b8bc6856b29314b5c91c992b3 /kafka/client_async.py | |
parent | ee4a53e9e5ae93231d6f7010f263b30a9924dabb (diff) | |
download | kafka-python-bootstrap_cluster.tar.gz |
Maintain shadow bootstrap cluster metadatabootstrap_cluster
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 35 |
1 files changed, 10 insertions, 25 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index fdf5454..f8a6054 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -245,11 +245,7 @@ class KafkaClient(object): def _can_connect(self, node_id): if node_id not in self._conns: - # cluster.broker_metadata() is stateful when called w/ 'bootstrap' - # (it cycles through all of the bootstrap servers) - # so we short-circuit here and assume that we should always have - # some bootstrap_servers config to power bootstrap broker_metadata - if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id): + if self.cluster.broker_metadata(node_id): return True return False conn = self._conns[node_id] @@ -266,7 +262,7 @@ class KafkaClient(object): except KeyError: self._selector.modify(conn._sock, selectors.EVENT_WRITE) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() elif conn.connected(): @@ -284,12 +280,13 @@ class KafkaClient(object): self._idle_expiry_manager.update(node_id) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._bootstrap_fails = 0 - elif 'bootstrap' in self._conns: - bootstrap = self._conns.pop('bootstrap') - bootstrap.close() + else: + for node_id in list(self._conns.keys()): + if self.cluster.is_bootstrap(node_id): + self._conns.pop(node_id).close() # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: @@ -308,7 +305,7 @@ class KafkaClient(object): idle_disconnect = True self._idle_expiry_manager.remove(node_id) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: @@ -331,10 +328,6 @@ class KafkaClient(object): if not conn.disconnected(): return False - # Always recycled disconnected bootstraps - elif conn.node_id == 'bootstrap': - return True - # Otherwise, only recycle when broker metadata has changed broker = self.cluster.broker_metadata(conn.node_id) if broker is None: @@ -355,10 +348,6 @@ class KafkaClient(object): conn = self._conns.get(node_id) if conn is None: - # Note that when bootstrapping, each call to broker_metadata may - # return a different host/port. So we need to be careful to only - # call when necessary to avoid skipping some possible bootstrap - # source. broker = self.cluster.broker_metadata(node_id) assert broker, 'Broker id %s not in current metadata' % (node_id,) @@ -697,7 +686,7 @@ class KafkaClient(object): in-flight-requests. If no such node is found, a node will be chosen randomly from disconnected nodes that are not "blacked out" (i.e., are not subject to a reconnect backoff). If no node metadata has been - obtained, will return 'bootstrap' (subject to exponential backoff). + obtained, will return a bootstrap node (subject to exponential backoff). Returns: node_id or None if no suitable node was found @@ -724,10 +713,6 @@ class KafkaClient(object): if found is not None: return found - elif not nodes and self._can_bootstrap(): - self._last_bootstrap = time.time() - return 'bootstrap' - return None def set_topics(self, topics): @@ -785,7 +770,7 @@ class KafkaClient(object): if self._can_send_request(node_id): topics = list(self._topics) - if not topics and node_id == 'bootstrap': + if not topics and self.cluster.is_bootstrap(node_id): topics = list(self.config['bootstrap_topics_filter']) if self.cluster.need_all_topic_metadata or not topics: |