diff options
-rw-r--r-- | kafka/client_async.py | 11 | ||||
-rw-r--r-- | kafka/consumer/group.py | 2 | ||||
-rw-r--r-- | test/test_client_async.py | 6 |
3 files changed, 10 insertions, 9 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 1838aed..1c74c6f 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -314,11 +314,12 @@ class KafkaClient(object): else: task_future.success(result) - task_timeout_ms = max(0, 1000 * ( - self._delayed_tasks.next_at() - time.time())) - timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms, - self.config['request_timeout_ms']) - timeout /= 1000.0 + timeout = min( + timeout_ms, + metadata_timeout_ms, + self._delayed_tasks.next_at() * 1000, + self.config['request_timeout_ms']) + timeout = max(0, timeout / 1000.0) responses.extend(self._poll(timeout)) if not future or future.is_done: diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 75fe3ee..3fb9c8e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -631,7 +631,7 @@ class KafkaConsumer(six.Iterator): self._client.poll() timeout_at = min(self._consumer_timeout, - self._client._delayed_tasks.next_at(), + self._client._delayed_tasks.next_at() + time.time(), self._client.cluster.ttl() / 1000.0 + time.time()) for msg in self._fetcher: yield msg diff --git a/test/test_client_async.py b/test/test_client_async.py index 9191c5e..b6bf0f6 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -252,7 +252,7 @@ def test_poll(mocker): # metadata timeout wins metadata.return_value = 1000 - tasks.return_value = time.time() + 2 # 2 seconds from now + tasks.return_value = 2 cli.poll() _poll.assert_called_with(1.0) @@ -261,13 +261,13 @@ def test_poll(mocker): _poll.assert_called_with(0.25) # tasks timeout wins - tasks.return_value = time.time() # next task is now + tasks.return_value = 0 cli.poll(250) _poll.assert_called_with(0) # default is request_timeout_ms metadata.return_value = 1000000 - tasks.return_value = time.time() + 10000 + tasks.return_value = 10000 cli.poll() _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) |