summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@gmail.com>2016-02-15 19:46:48 -0800
committerDana Powers <dana.powers@gmail.com>2016-02-15 19:46:48 -0800
commitd2dbc5b3a3f6aa1b397059125fbb1381b3640c42 (patch)
treecf4f8649fd7a9a134d6a778bf21e71379b7dd848
parent876791430513ac819d37e9877661387958d50fe4 (diff)
downloadkafka-python-d2dbc5b3a3f6aa1b397059125fbb1381b3640c42.tar.gz
Cleaner event handling in _wait_on_metadata
-rw-r--r--kafka/producer/kafka.py24
1 files changed, 9 insertions, 15 deletions
diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py
index 0a40325..578a4cb 100644
--- a/kafka/producer/kafka.py
+++ b/kafka/producer/kafka.py
@@ -438,35 +438,29 @@ class KafkaProducer(object):
"""
# add topic to metadata topic list if it is not there already.
self._sender.add_topic(topic)
-
- # Coordinate sleep / wake with a threading.Event
- def request_update(self, _event):
- _event.clear()
- log.debug("Requesting metadata update for topic %s.", topic)
- f = self._metadata.request_update()
- def _event_set(_event, *args):
- _event.set()
- f.add_both(_event_set, _event)
- return f
-
begin = time.time()
elapsed = 0.0
- event = threading.Event()
+ metadata_event = threading.Event()
while True:
partitions = self._metadata.partitions_for_topic(topic)
if partitions is not None:
return partitions
log.debug("Requesting metadata update for topic %s", topic)
- future = request_update(self, event)
+
+ metadata_event.clear()
+ future = self._metadata.request_update()
+ future.add_both(lambda e, *args: e.set(), metadata_event)
self._sender.wakeup()
- event.wait(max_wait - elapsed)
+ metadata_event.wait(max_wait - elapsed)
elapsed = time.time() - begin
- if elapsed >= max_wait:
+ if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %s secs.", max_wait)
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
+ else:
+ log.debug("_wait_on_metadata woke after %s secs.", elapsed)
def _serialize(self, topic, key, value):
# pylint: disable-msg=not-callable