summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-13 00:09:03 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-13 09:35:49 -0700
commitbd5bd62b09425140cf53a7fb61c56b88ce19ab96 (patch)
tree3ea8525aba914b2f55ff421d71c7ea75e9739857
parent5d28581b40c654d80b282e489c7149e72cf1b954 (diff)
downloadkafka-python-bd5bd62b09425140cf53a7fb61c56b88ce19ab96.tar.gz
Add ignore_leadernotavailable kwarg to SimpleClient.load_metadata_for_topics
-rw-r--r--kafka/client.py22
-rw-r--r--kafka/consumer/base.py2
-rw-r--r--kafka/producer/keyed.py2
3 files changed, 15 insertions, 11 deletions
diff --git a/kafka/client.py b/kafka/client.py
index e76274c..4b5a043 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -450,17 +450,10 @@ class SimpleClient(object):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
- try:
- self.load_metadata_for_topics(topic)
- except LeaderNotAvailableError:
- pass
- except UnknownTopicOrPartitionError:
- # Server is not configured to auto-create
- # retrying in this case will not help
- raise
+ self.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
time.sleep(.5)
- def load_metadata_for_topics(self, *topics):
+ def load_metadata_for_topics(self, *topics, **kwargs):
"""Fetch broker and topic-partition metadata from the server.
Updates internal data: broker list, topic/partition list, and
@@ -476,6 +469,9 @@ class SimpleClient(object):
*topics (optional): If a list of topics is provided,
the metadata refresh will be limited to the specified topics
only.
+ ignore_leadernotavailable (bool): suppress LeaderNotAvailableError
+ so that metadata is loaded correctly during auto-create.
+ Default: False.
Raises:
UnknownTopicOrPartitionError: Raised for topics that do not exist,
@@ -484,6 +480,11 @@ class SimpleClient(object):
when the broker is configured to auto-create topics. Retry
after a short backoff (topics/partitions are initializing).
"""
+ if 'ignore_leadernotavailable' in kwargs:
+ ignore_leadernotavailable = kwargs['ignore_leadernotavailable']
+ else:
+ ignore_leadernotavailable = False
+
if topics:
self.reset_topic_metadata(*topics)
else:
@@ -506,6 +507,9 @@ class SimpleClient(object):
topic, error_type, error)
if topic not in topics:
continue
+ elif (error_type is LeaderNotAvailableError and
+ ignore_leadernotavailable):
+ continue
raise error_type(topic)
self.topic_partitions[topic] = {}
diff --git a/kafka/consumer/base.py b/kafka/consumer/base.py
index 78f376e..75c3ee1 100644
--- a/kafka/consumer/base.py
+++ b/kafka/consumer/base.py
@@ -53,7 +53,7 @@ class Consumer(object):
self.client = client
self.topic = topic
self.group = group
- self.client.load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
self.offsets = {}
if partitions is None:
diff --git a/kafka/producer/keyed.py b/kafka/producer/keyed.py
index f35aef0..9fba33b 100644
--- a/kafka/producer/keyed.py
+++ b/kafka/producer/keyed.py
@@ -29,7 +29,7 @@ class KeyedProducer(Producer):
def _next_partition(self, topic, key):
if topic not in self.partitioners:
if not self.client.has_metadata_for_topic(topic):
- self.client.load_metadata_for_topics(topic)
+ self.client.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
self.partitioners[topic] = self.partitioner_class(self.client.get_partition_ids_for_topic(topic))