diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-21 08:19:23 -0700 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2019-03-21 08:19:23 -0700 |
commit | 119378391405cbacdede48364bb50dc91ad1544b (patch) | |
tree | 63fa01aaca9c261b8bc6856b29314b5c91c992b3 | |
parent | ee4a53e9e5ae93231d6f7010f263b30a9924dabb (diff) | |
download | kafka-python-bootstrap_cluster.tar.gz |
Maintain shadow bootstrap cluster metadatabootstrap_cluster
-rw-r--r-- | kafka/client_async.py | 35 | ||||
-rw-r--r-- | kafka/cluster.py | 19 |
2 files changed, 20 insertions, 34 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index fdf5454..f8a6054 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -245,11 +245,7 @@ class KafkaClient(object): def _can_connect(self, node_id): if node_id not in self._conns: - # cluster.broker_metadata() is stateful when called w/ 'bootstrap' - # (it cycles through all of the bootstrap servers) - # so we short-circuit here and assume that we should always have - # some bootstrap_servers config to power bootstrap broker_metadata - if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id): + if self.cluster.broker_metadata(node_id): return True return False conn = self._conns[node_id] @@ -266,7 +262,7 @@ class KafkaClient(object): except KeyError: self._selector.modify(conn._sock, selectors.EVENT_WRITE) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._last_bootstrap = time.time() elif conn.connected(): @@ -284,12 +280,13 @@ class KafkaClient(object): self._idle_expiry_manager.update(node_id) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._bootstrap_fails = 0 - elif 'bootstrap' in self._conns: - bootstrap = self._conns.pop('bootstrap') - bootstrap.close() + else: + for node_id in list(self._conns.keys()): + if self.cluster.is_bootstrap(node_id): + self._conns.pop(node_id).close() # Connection failures imply that our metadata is stale, so let's refresh elif conn.state is ConnectionStates.DISCONNECTING: @@ -308,7 +305,7 @@ class KafkaClient(object): idle_disconnect = True self._idle_expiry_manager.remove(node_id) - if node_id == 'bootstrap': + if self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: @@ -331,10 +328,6 @@ class KafkaClient(object): if not conn.disconnected(): return False - # Always recycled disconnected bootstraps - elif conn.node_id == 'bootstrap': - return True - # Otherwise, only recycle when broker metadata has changed broker = self.cluster.broker_metadata(conn.node_id) if broker is None: @@ -355,10 +348,6 @@ class KafkaClient(object): conn = self._conns.get(node_id) if conn is None: - # Note that when bootstrapping, each call to broker_metadata may - # return a different host/port. So we need to be careful to only - # call when necessary to avoid skipping some possible bootstrap - # source. broker = self.cluster.broker_metadata(node_id) assert broker, 'Broker id %s not in current metadata' % (node_id,) @@ -697,7 +686,7 @@ class KafkaClient(object): in-flight-requests. If no such node is found, a node will be chosen randomly from disconnected nodes that are not "blacked out" (i.e., are not subject to a reconnect backoff). If no node metadata has been - obtained, will return 'bootstrap' (subject to exponential backoff). + obtained, will return a bootstrap node (subject to exponential backoff). Returns: node_id or None if no suitable node was found @@ -724,10 +713,6 @@ class KafkaClient(object): if found is not None: return found - elif not nodes and self._can_bootstrap(): - self._last_bootstrap = time.time() - return 'bootstrap' - return None def set_topics(self, topics): @@ -785,7 +770,7 @@ class KafkaClient(object): if self._can_send_request(node_id): topics = list(self._topics) - if not topics and node_id == 'bootstrap': + if not topics and self.cluster.is_bootstrap(node_id): topics = list(self.config['bootstrap_topics_filter']) if self.cluster.need_all_topic_metadata or not topics: diff --git a/kafka/cluster.py b/kafka/cluster.py index 3d57ed2..17cfd14 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -70,10 +70,14 @@ class ClusterMetadata(object): # collect_hosts does not perform DNS, so we should be fine to re-use bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) - while True: - for host, port, afi in bootstrap_hosts: - for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi): - yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None) + brokers = {} + for i, (host, port, _) in enumerate(bootstrap_hosts): + node_id = 'bootstrap-%s' % i + brokers[node_id] = BrokerMetadata(node_id, host, port, None) + return brokers + + def is_bootstrap(self, node_id): + return node_id in self._bootstrap_brokers def brokers(self): """Get all BrokerMetadata @@ -81,7 +85,7 @@ class ClusterMetadata(object): Returns: set: {BrokerMetadata, ...} """ - return set(self._brokers.values()) + return set(self._brokers.values()) or set(self._bootstrap_brokers.values()) def broker_metadata(self, broker_id): """Get BrokerMetadata @@ -92,10 +96,7 @@ class ClusterMetadata(object): Returns: BrokerMetadata or None if not found """ - if broker_id == 'bootstrap': - return next(self._bootstrap_brokers) - - return self._brokers.get(broker_id) + return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id) def partitions_for_topic(self, topic): """Return set of all partitions for topic (whether available or not) |