summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
authorDana Powers <dana.powers@rd.io>2016-01-10 19:52:16 -0800
committerDana Powers <dana.powers@rd.io>2016-01-10 21:24:33 -0800
commit5fa8c88d6f369b3eceae7f34296b56cfd92d1f90 (patch)
treed6e5bbbaed8cb31cfc9040733d5b0d29c71a4cc0 /kafka/client_async.py
parent1fd596062fba5ce4236623249ffafcf0be985282 (diff)
downloadkafka-python-5fa8c88d6f369b3eceae7f34296b56cfd92d1f90.tar.gz
If a completed future is polled, do not block
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py19
1 files changed, 13 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fa498e9..3a1922e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,14 +314,21 @@ class KafkaClient(object):
else:
task_future.success(result)
- timeout = min(
- timeout_ms,
- metadata_timeout_ms,
- self._delayed_tasks.next_at() * 1000,
- self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0)
+ # If we got a future that is already done, dont block in _poll
+ if future and future.is_done:
+ timeout = 0
+ else:
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ self._delayed_tasks.next_at() * 1000,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000.0) # avoid negative timeouts
responses.extend(self._poll(timeout))
+
+ # If all we had was a timeout (future is None) - only do one poll
+ # If we do have a future, we keep looping until it is done
if not future or future.is_done:
break