diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-02-15 16:24:02 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-02-15 16:24:02 -0800 |
commit | bce0cad5d384c527d6f25209cb794017cd050303 (patch) | |
tree | 02f2aab2cbaad7e15457e667bad4d5b4e788ac27 | |
parent | bd0caa76d0ce0e06abaef070f919a4d80d10faf2 (diff) | |
download | kafka-python-bce0cad5d384c527d6f25209cb794017cd050303.tar.gz |
Revisit _wait_on_metadata to address timeout and error handling (Issue 539)
-rw-r--r-- | kafka/producer/kafka.py | 39 |
1 files changed, 19 insertions, 20 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2443265..e8601c8 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -439,36 +439,35 @@ class KafkaProducer(object): """ # add topic to metadata topic list if it is not there already. self._sender.add_topic(topic) - partitions = self._metadata.partitions_for_topic(topic) - if partitions: - return partitions - event = threading.Event() - def event_set(*args): - event.set() - def request_update(self, event): - event.clear() + # Coordinate sleep / wake with a threading.Event + def request_update(self, _event): + _event.clear() log.debug("Requesting metadata update for topic %s.", topic) f = self._metadata.request_update() - f.add_both(event_set) + def _event_set(_event, *args): + _event.set() + f.add_both(_event_set, _event) return f begin = time.time() elapsed = 0.0 - future = request_update(self, event) - while elapsed < max_wait: + event = threading.Event() + while True: + partitions = self._metadata.partitions_for_topic(topic) + if partitions is not None: + return partitions + + log.debug("Requesting metadata update for topic %s", topic) + future = request_update(self, event) self._sender.wakeup() event.wait(max_wait - elapsed) - if future.failed(): - future = request_update(self, event) elapsed = time.time() - begin - - partitions = self._metadata.partitions_for_topic(topic) - if partitions: - return partitions - else: - raise Errors.KafkaTimeoutError( - "Failed to update metadata after %s secs.", max_wait) + if elapsed >= max_wait: + raise Errors.KafkaTimeoutError( + "Failed to update metadata after %s secs.", max_wait) + elif topic in self._metadata.unauthorized_topics: + raise Errors.TopicAuthorizationFailedError(topic) def _serialize(self, topic, key, value): # pylint: disable-msg=not-callable |