summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--kafka/client_async.py11
-rw-r--r--kafka/consumer/group.py2
-rw-r--r--test/test_client_async.py6
3 files changed, 10 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py
index 1838aed..1c74c6f 100644
--- a/kafka/client_async.py
+++ b/kafka/client_async.py
@@ -314,11 +314,12 @@ class KafkaClient(object):
else:
task_future.success(result)
- task_timeout_ms = max(0, 1000 * (
- self._delayed_tasks.next_at() - time.time()))
- timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms,
- self.config['request_timeout_ms'])
- timeout /= 1000.0
+ timeout = min(
+ timeout_ms,
+ metadata_timeout_ms,
+ self._delayed_tasks.next_at() * 1000,
+ self.config['request_timeout_ms'])
+ timeout = max(0, timeout / 1000.0)
responses.extend(self._poll(timeout))
if not future or future.is_done:
diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py
index 75fe3ee..3fb9c8e 100644
--- a/kafka/consumer/group.py
+++ b/kafka/consumer/group.py
@@ -631,7 +631,7 @@ class KafkaConsumer(six.Iterator):
self._client.poll()
timeout_at = min(self._consumer_timeout,
- self._client._delayed_tasks.next_at(),
+ self._client._delayed_tasks.next_at() + time.time(),
self._client.cluster.ttl() / 1000.0 + time.time())
for msg in self._fetcher:
yield msg
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 9191c5e..b6bf0f6 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -252,7 +252,7 @@ def test_poll(mocker):
# metadata timeout wins
metadata.return_value = 1000
- tasks.return_value = time.time() + 2 # 2 seconds from now
+ tasks.return_value = 2
cli.poll()
_poll.assert_called_with(1.0)
@@ -261,13 +261,13 @@ def test_poll(mocker):
_poll.assert_called_with(0.25)
# tasks timeout wins
- tasks.return_value = time.time() # next task is now
+ tasks.return_value = 0
cli.poll(250)
_poll.assert_called_with(0)
# default is request_timeout_ms
metadata.return_value = 1000000
- tasks.return_value = time.time() + 10000
+ tasks.return_value = 10000
cli.poll()
_poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0)