summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 15:42:26 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 15:42:47 -0800
commit45d26b6d32d1b4382c2a1ce0194111ac8051e124 (patch)
tree46e0591fe3d03b31516889b854be226f4a30926b /kafka/client_async.py
parent7ee73df4c4bb3c69ac38accc30ff68bc6d64d594 (diff)
downloadkafka-python-45d26b6d32d1b4382c2a1ce0194111ac8051e124.tar.gz
Check delayed task timeout in client.poll()
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 30d4d6f..1838aed 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -302,7 +302,7 @@ class KafkaClient(object):
self._finish_connect(node_id)
# Send a metadata request if needed
- metadata_timeout = self._maybe_refresh_metadata()
+ metadata_timeout_ms = self._maybe_refresh_metadata()
# Send scheduled tasks
for task, task_future in self._delayed_tasks.pop_ready():
@@ -314,7 +314,9 @@ class KafkaClient(object):
else:
task_future.success(result)
- timeout = min(timeout_ms, metadata_timeout,
+ task_timeout_ms = max(0, 1000 * (
+ self._delayed_tasks.next_at() - time.time()))
+ timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms,
self.config['request_timeout_ms'])
timeout /= 1000.0