diff options
Diffstat (limited to 'kafka/client.py')
-rw-r--r-- | kafka/client.py | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/kafka/client.py b/kafka/client.py index e76274c..4b5a043 100644 --- a/kafka/client.py +++ b/kafka/client.py @@ -450,17 +450,10 @@ class SimpleClient(object): while not self.has_metadata_for_topic(topic): if time.time() > start_time + timeout: raise KafkaTimeoutError('Unable to create topic {0}'.format(topic)) - try: - self.load_metadata_for_topics(topic) - except LeaderNotAvailableError: - pass - except UnknownTopicOrPartitionError: - # Server is not configured to auto-create - # retrying in this case will not help - raise + self.load_metadata_for_topics(topic, ignore_leadernotavailable=True) time.sleep(.5) - def load_metadata_for_topics(self, *topics): + def load_metadata_for_topics(self, *topics, **kwargs): """Fetch broker and topic-partition metadata from the server. Updates internal data: broker list, topic/partition list, and @@ -476,6 +469,9 @@ class SimpleClient(object): *topics (optional): If a list of topics is provided, the metadata refresh will be limited to the specified topics only. + ignore_leadernotavailable (bool): suppress LeaderNotAvailableError + so that metadata is loaded correctly during auto-create. + Default: False. Raises: UnknownTopicOrPartitionError: Raised for topics that do not exist, @@ -484,6 +480,11 @@ class SimpleClient(object): when the broker is configured to auto-create topics. Retry after a short backoff (topics/partitions are initializing). """ + if 'ignore_leadernotavailable' in kwargs: + ignore_leadernotavailable = kwargs['ignore_leadernotavailable'] + else: + ignore_leadernotavailable = False + if topics: self.reset_topic_metadata(*topics) else: @@ -506,6 +507,9 @@ class SimpleClient(object): topic, error_type, error) if topic not in topics: continue + elif (error_type is LeaderNotAvailableError and + ignore_leadernotavailable): + continue raise error_type(topic) self.topic_partitions[topic] = {} |