summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2015-12-30 11:49:22 -0800
committerDana Powers <dana.powers@rd.io>2015-12-30 11:49:22 -0800
commit9bc01657ed9402b502f7156ae95764029436eab3 (patch)
tree864ad406aae44393bb426d4507363cb22964a475
parent88cf1b5e4551cd96322aa812fa482bf0f978060a (diff)
downloadkafka-python-9bc01657ed9402b502f7156ae95764029436eab3.tar.gz
Resolve delayed task futures in KafkaClient.poll
-rw-r--r--kafka/client_async.py9
1 files changed, 6 insertions, 3 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 386b0cb..ca81214 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -243,11 +243,14 @@ class KafkaClient(object):
metadata_timeout = self._maybe_refresh_metadata()
# Send scheduled tasks
- for task in self._delayed_tasks.pop_ready():
+ for task, future in self._delayed_tasks.pop_ready():
try:
- task()
+ result = task()
except Exception as e:
log.error("Task %s failed: %s", task, e)
+ future.failure(e)
+ else:
+ future.success(result)
timeout = min(timeout_ms, metadata_timeout,
self.config['request_timeout_ms'])
@@ -450,7 +453,7 @@ class DelayedTaskQueue(object):
else:
task, future = maybe_task
del self._task_map[task]
- return task
+ return (task, future)
def next_at(self):
"""Number of seconds until next task is ready"""