diff options
-rw-r--r-- | kafka/client_async.py | 6 | ||||
-rw-r--r-- | test/test_client_async.py | 30 |
2 files changed, 32 insertions, 4 deletions
diff --git a/kafka/client_async.py b/kafka/client_async.py index 30d4d6f..1838aed 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -302,7 +302,7 @@ class KafkaClient(object): self._finish_connect(node_id) # Send a metadata request if needed - metadata_timeout = self._maybe_refresh_metadata() + metadata_timeout_ms = self._maybe_refresh_metadata() # Send scheduled tasks for task, task_future in self._delayed_tasks.pop_ready(): @@ -314,7 +314,9 @@ class KafkaClient(object): else: task_future.success(result) - timeout = min(timeout_ms, metadata_timeout, + task_timeout_ms = max(0, 1000 * ( + self._delayed_tasks.next_at() - time.time())) + timeout = min(timeout_ms, metadata_timeout_ms, task_timeout_ms, self.config['request_timeout_ms']) timeout /= 1000.0 diff --git a/test/test_client_async.py b/test/test_client_async.py index 447ea49..9191c5e 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -1,3 +1,4 @@ +import time import pytest @@ -242,8 +243,33 @@ def test_send(conn): assert conn.send.called_with(request, expect_response=True) -def test_poll(): - pass +def test_poll(mocker): + mocker.patch.object(KafkaClient, '_bootstrap') + metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') + _poll = mocker.patch.object(KafkaClient, '_poll') + cli = KafkaClient() + tasks = mocker.patch.object(cli._delayed_tasks, 'next_at') + + # metadata timeout wins + metadata.return_value = 1000 + tasks.return_value = time.time() + 2 # 2 seconds from now + cli.poll() + _poll.assert_called_with(1.0) + + # user timeout wins + cli.poll(250) + _poll.assert_called_with(0.25) + + # tasks timeout wins + tasks.return_value = time.time() # next task is now + cli.poll(250) + _poll.assert_called_with(0) + + # default is request_timeout_ms + metadata.return_value = 1000000 + tasks.return_value = time.time() + 10000 + cli.poll() + _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) def test__poll(): |