summaryrefslogtreecommitdiff
path: root/kafka/client_async.py
diff options
context:
space:
mode:
Diffstat (limited to 'kafka/client_async.py')
-rw-r--r--kafka/client_async.py19
1 files changed, 13 insertions, 6 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index fa498e9..3a1922e 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,14 +314,21 @@ class KafkaClient(object):
else:
task_future.success(result)
- timeout = min(
- timeout_ms,
- metadata_timeout_ms,
- self._delayed_tasks.next_at() * 1000,
- self.config['request_timeout_ms'])
- timeout = max(0, timeout / 1000.0)
+ # If we got a future that is already done, dont block in _poll
+ if future and future.is_done:
+ timeout = 0
+ else:
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ self._delayed_tasks.next_at() * 1000,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000.0) # avoid negative timeouts
responses.extend(self._poll(timeout))
+
+ # If all we had was a timeout (future is None) - only do one poll
+ # If we do have a future, we keep looping until it is done
if not future or future.is_done:
break