summaryrefslogtreecommitdiff
path: root/kafka/cluster.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2019-03-14 09:39:28 -0700
committerJeff Widman <jeff@jeffwidman.com>2019-03-14 09:39:28 -0700
commit812de351f75beefe73bd9bef55847ab61ccc951d (patch)
treec0122d099cbe0ff0c2c3a1adf76dc09493ae8bcf /kafka/cluster.py
parent703f06590be2daa7e4592b3d82df6d719a6829bb (diff)
downloadkafka-python-812de351f75beefe73bd9bef55847ab61ccc951d.tar.gz
Retry bootstrapping after backoff when necessary (#1736)
The current client attempts to bootstrap once during initialization, but if it fails there is no second attempt and the client will be inoperable. This can happen, for example, if an entire cluster is down at the time a long-running client starts execution. This commit attempts to fix this by removing the synchronous bootstrapping from `KafkaClient` init, and instead merges bootstrap metadata with the cluster metadata. The Java client uses a similar approach. This allows us to continue falling back to bootstrap data when necessary throughout the life of a long-running consumer or producer. Fix #1670
Diffstat (limited to 'kafka/cluster.py')
-rw-r--r--kafka/cluster.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/kafka/cluster.py b/kafka/cluster.py
index 8078eb7..3d57ed2 100644
--- a/kafka/cluster.py
+++ b/kafka/cluster.py
@@ -9,6 +9,7 @@ import time
from kafka.vendor import six
from kafka import errors as Errors
+from kafka.conn import collect_hosts, dns_lookup
from kafka.future import Future
from kafka.structs import BrokerMetadata, PartitionMetadata, TopicPartition
@@ -29,10 +30,17 @@ class ClusterMetadata(object):
which we force a refresh of metadata even if we haven't seen any
partition leadership changes to proactively discover any new
brokers or partitions. Default: 300000
+ bootstrap_servers: 'host[:port]' string (or list of 'host[:port]'
+ strings) that the client should contact to bootstrap initial
+ cluster metadata. This does not have to be the full node list.
+ It just needs to have at least one broker that will respond to a
+ Metadata API Request. Default port is 9092. If no servers are
+ specified, will default to localhost:9092.
"""
DEFAULT_CONFIG = {
'retry_backoff_ms': 100,
'metadata_max_age_ms': 300000,
+ 'bootstrap_servers': 'localhost',
}
def __init__(self, **configs):
@@ -42,7 +50,7 @@ class ClusterMetadata(object):
self._groups = {} # group_name -> node_id
self._last_refresh_ms = 0
self._last_successful_refresh_ms = 0
- self._need_update = False
+ self._need_update = True
self._future = None
self._listeners = set()
self._lock = threading.Lock()
@@ -56,6 +64,17 @@ class ClusterMetadata(object):
if key in configs:
self.config[key] = configs[key]
+ self._bootstrap_brokers = self._generate_bootstrap_brokers()
+
+ def _generate_bootstrap_brokers(self):
+ # collect_hosts does not perform DNS, so we should be fine to re-use
+ bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
+
+ while True:
+ for host, port, afi in bootstrap_hosts:
+ for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
+ yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
+
def brokers(self):
"""Get all BrokerMetadata
@@ -73,6 +92,9 @@ class ClusterMetadata(object):
Returns:
BrokerMetadata or None if not found
"""
+ if broker_id == 'bootstrap':
+ return next(self._bootstrap_brokers)
+
return self._brokers.get(broker_id)
def partitions_for_topic(self, topic):