summaryrefslogtreecommitdiff
path: root/kafka/client.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-03-13 00:09:03 -0800
committerDana Powers <dana.powers@gmail.com>2016-03-13 09:35:49 -0700
commitbd5bd62b09425140cf53a7fb61c56b88ce19ab96 (patch)
tree3ea8525aba914b2f55ff421d71c7ea75e9739857 /kafka/client.py
parent5d28581b40c654d80b282e489c7149e72cf1b954 (diff)
downloadkafka-python-bd5bd62b09425140cf53a7fb61c56b88ce19ab96.tar.gz
Add ignore_leadernotavailable kwarg to SimpleClient.load_metadata_for_topics
Diffstat (limited to 'kafka/client.py')
-rw-r--r--kafka/client.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py
index e76274c..4b5a043 100644
--- a/kafka/client.py
+++ b/kafka/client.py
@@ -450,17 +450,10 @@ class SimpleClient(object):
while not self.has_metadata_for_topic(topic):
if time.time() > start_time + timeout:
raise KafkaTimeoutError('Unable to create topic {0}'.format(topic))
- try:
- self.load_metadata_for_topics(topic)
- except LeaderNotAvailableError:
- pass
- except UnknownTopicOrPartitionError:
- # Server is not configured to auto-create
- # retrying in this case will not help
- raise
+ self.load_metadata_for_topics(topic, ignore_leadernotavailable=True)
time.sleep(.5)
- def load_metadata_for_topics(self, *topics):
+ def load_metadata_for_topics(self, *topics, **kwargs):
"""Fetch broker and topic-partition metadata from the server.
Updates internal data: broker list, topic/partition list, and
@@ -476,6 +469,9 @@ class SimpleClient(object):
*topics (optional): If a list of topics is provided,
the metadata refresh will be limited to the specified topics
only.
+ ignore_leadernotavailable (bool): suppress LeaderNotAvailableError
+ so that metadata is loaded correctly during auto-create.
+ Default: False.
Raises:
UnknownTopicOrPartitionError: Raised for topics that do not exist,
@@ -484,6 +480,11 @@ class SimpleClient(object):
when the broker is configured to auto-create topics. Retry
after a short backoff (topics/partitions are initializing).
"""
+ if 'ignore_leadernotavailable' in kwargs:
+ ignore_leadernotavailable = kwargs['ignore_leadernotavailable']
+ else:
+ ignore_leadernotavailable = False
+
if topics:
self.reset_topic_metadata(*topics)
else:
@@ -506,6 +507,9 @@ class SimpleClient(object):
topic, error_type, error)
if topic not in topics:
continue
+ elif (error_type is LeaderNotAvailableError and
+ ignore_leadernotavailable):
+ continue
raise error_type(topic)
self.topic_partitions[topic] = {}