summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 21:11:02 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 21:21:40 -0800
commit5d2886bae36c8336a15e0f58c827556de186350a (patch)
tree49257041620eefb9c2437fcf62c3c7239510bb36 /kafka/client_async.py
parentca08f759df60f4e7debaacc9b674e3191d5106bb (diff)
downloadkafka-python-5d2886bae36c8336a15e0f58c827556de186350a.tar.gz
Fix delayed_task timeout commit 45d26b6
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py11
1 files changed, 6 insertions, 5 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1838aed..1c74c6f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,11 +314,12 @@ class KafkaClient(object):
else:
task_future.success(result)
- task_timeout_ms = max(0, 1000 * (
- self._delayed_tasks.next_at() - time.time()))
- timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms,
- self.config['request_timeout_ms'])
- timeout /= 1000.0
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ self._delayed_tasks.next_at() * 1000,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000.0)
responses.extend(self._poll(timeout))
if not future or future.is_done: