summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/producer/kafka.py39
1 files changed, 19 insertions, 20 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 2443265..e8601c8 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -439,36 +439,35 @@ class KafkaProducer(object):
"""
# add topic to metadata topic list if it is not there already.
self._sender.add_topic(topic)
- partitions = self._metadata.partitions_for_topic(topic)
- if partitions:
- return partitions
- event = threading.Event()
- def event_set(*args):
- event.set()
- def request_update(self, event):
- event.clear()
+ # Coordinate sleep / wake with a threading.Event
+ def request_update(self, _event):
+ _event.clear()
log.debug("Requesting metadata update for topic %s.", topic)
f = self._metadata.request_update()
- f.add_both(event_set)
+ def _event_set(_event, *args):
+ _event.set()
+ f.add_both(_event_set, _event)
return f
begin = time.time()
elapsed = 0.0
- future = request_update(self, event)
- while elapsed < max_wait:
+ event = threading.Event()
+ while True:
+ partitions = self._metadata.partitions_for_topic(topic)
+ if partitions is not None:
+ return partitions
+
+ log.debug("Requesting metadata update for topic %s", topic)
+ future = request_update(self, event)
self._sender.wakeup()
event.wait(max_wait - elapsed)
- if future.failed():
- future = request_update(self, event)
elapsed = time.time() - begin
-
- partitions = self._metadata.partitions_for_topic(topic)
- if partitions:
- return partitions
- else:
- raise Errors.KafkaTimeoutError(
- "Failed to update metadata after %s secs.", max_wait)
+ if elapsed >= max_wait:
+ raise Errors.KafkaTimeoutError(
+ "Failed to update metadata after %s secs.", max_wait)
+ elif topic in self._metadata.unauthorized_topics:
+ raise Errors.TopicAuthorizationFailedError(topic)
def _serialize(self, topic, key, value):
# pylint: disable-msg=not-callable