diff options
-rw-r--r-- | kafka/producer/kafka.py | 24 |
1 files changed, 9 insertions, 15 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0a40325..578a4cb 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -438,35 +438,29 @@ class KafkaProducer(object): """ # add topic to metadata topic list if it is not there already. self._sender.add_topic(topic) - - # Coordinate sleep / wake with a threading.Event - def request_update(self, _event): - _event.clear() - log.debug("Requesting metadata update for topic %s.", topic) - f = self._metadata.request_update() - def _event_set(_event, *args): - _event.set() - f.add_both(_event_set, _event) - return f - begin = time.time() elapsed = 0.0 - event = threading.Event() + metadata_event = threading.Event() while True: partitions = self._metadata.partitions_for_topic(topic) if partitions is not None: return partitions log.debug("Requesting metadata update for topic %s", topic) - future = request_update(self, event) + + metadata_event.clear() + future = self._metadata.request_update() + future.add_both(lambda e, *args: e.set(), metadata_event) self._sender.wakeup() - event.wait(max_wait - elapsed) + metadata_event.wait(max_wait - elapsed) elapsed = time.time() - begin - if elapsed >= max_wait: + if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( "Failed to update metadata after %s secs.", max_wait) elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) + else: + log.debug("_wait_on_metadata woke after %s secs.", elapsed) def _serialize(self, topic, key, value): # pylint: disable-msg=not-callable |