diff options
author | Dana Powers <dana.powers@gmail.com> | 2016-01-10 16:40:29 -0800 |
---|---|---|
committer | Dana Powers <dana.powers@gmail.com> | 2016-01-10 16:40:29 -0800 |
commit | ca08f759df60f4e7debaacc9b674e3191d5106bb (patch) | |
tree | cdc73d91abf37cee9771d7e5382a493525a3b694 /kafka/client_async.py | |
parent | 8ae2a3073134ff58f1314bf64165456a8e627b0a (diff) | |
parent | 98e4ab3cb931ee110faa22f6afa6c72523e24db9 (diff) | |
download | kafka-python-ca08f759df60f4e7debaacc9b674e3191d5106bb.tar.gz |
Merge pull request #502 from dpkp/task_poll_timeout
Add delayed task timeouts to _poll calls
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r-- | kafka/client_async.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 30d4d6f..1838aed 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -302,7 +302,7 @@ class KafkaClient(object): self._finish_connect(node_id) # Send a metadata request if needed - metadata_timeout = self._maybe_refresh_metadata() + metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks for task, task_future in self._delayed_tasks.pop_ready(): @@ -314,7 +314,9 @@ class KafkaClient(object): else: task_future.success(result) - timeout = min(timeout_ms, metadata_timeout, + task_timeout_ms = max(0, 1000 * ( + self._delayed_tasks.next_at() - time.time())) + timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms, self.config['request_timeout_ms']) timeout /= 1000.0 |