diff options
author | Dana Powers <dana.powers@gmail.com> | 2019-03-14 09:39:28 -0700 |
---|---|---|
committer | Jeff Widman <jeff@jeffwidman.com> | 2019-03-14 09:39:28 -0700 |
commit | 812de351f75beefe73bd9bef55847ab61ccc951d (patch) | |
tree | c0122d099cbe0ff0c2c3a1adf76dc09493ae8bcf /kafka/cluster.py | |
parent | 703f06590be2daa7e4592b3d82df6d719a6829bb (diff) | |
download | kafka-python-812de351f75beefe73bd9bef55847ab61ccc951d.tar.gz |
Retry bootstrapping after backoff when necessary (#1736)
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution.
This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer.
Fix #1670
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r-- | kafka/cluster.py | 24 |
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py index 8078eb7..3d57ed2 100644 --- a/kafka/cluster.py +++ b/kafka/cluster.py @@ -9,6 +9,7 @@ import time from kafka.vendor import six from kafka import errors as Errors +from kafka.conn import collect_hosts, dns_lookup from kafka.future import Future from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition @@ -29,10 +30,17 @@ class ClusterMetadata(object): which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. Default: 300000 + bootstrap_servers: 'host[:port]' string (or list of 'host[:port]' + strings) that the client should contact to bootstrap initial + cluster metadata. This does not have to be the full node list. + It just needs to have at least one broker that will respond to a + Metadata API Request. Default port is 9092. If no servers are + specified, will default to localhost:9092. """ DEFAULT_CONFIG = { 'retry_backoff_ms': 100, 'metadata_max_age_ms': 300000, + 'bootstrap_servers': 'localhost', } def __init__(self, **configs): @@ -42,7 +50,7 @@ class ClusterMetadata(object): self._groups = {} # group_name -> node_id self._last_refresh_ms = 0 self._last_successful_refresh_ms = 0 - self._need_update = False + self._need_update = True self._future = None self._listeners = set() self._lock = threading.Lock() @@ -56,6 +64,17 @@ class ClusterMetadata(object): if key in configs: self.config[key] = configs[key] + self._bootstrap_brokers = self._generate_bootstrap_brokers() + + def _generate_bootstrap_brokers(self): + # collect_hosts does not perform DNS, so we should be fine to re-use + bootstrap_hosts = collect_hosts(self.config['bootstrap_servers']) + + while True: + for host, port, afi in bootstrap_hosts: + for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi): + yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None) + def brokers(self): """Get all BrokerMetadata @@ -73,6 +92,9 @@ class ClusterMetadata(object): Returns: BrokerMetadata or None if not found """ + if broker_id == 'bootstrap': + return next(self._bootstrap_brokers) + return self._brokers.get(broker_id) def partitions_for_topic(self, topic): |