summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/test_client_async.py39
1 files changed, 25 insertions, 14 deletions
diff --git a/test/test_client_async.py b/test/test_client_async.py
index 97be827..8f6ac3f 100644
--- a/test/test_client_async.py
+++ b/test/test_client_async.py
@@ -319,7 +319,7 @@ def client(mocker):
mocker.patch.object(KafkaClient, '_bootstrap')
_poll = mocker.patch.object(KafkaClient, '_poll')
- cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
+ cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
tasks.return_value = 9999999
@@ -332,7 +332,7 @@ def client(mocker):
def test_maybe_refresh_metadata_ttl(mocker, client):
client.cluster.ttl.return_value = 1234
- client.poll(timeout_ms=9999999, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
client._poll.assert_called_with(1.234, sleep=True)
@@ -340,17 +340,16 @@ def test_maybe_refresh_metadata_backoff(mocker, client):
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
- client._last_no_node_available_ms = now * 1000
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(2.222, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
def test_maybe_refresh_metadata_in_progress(mocker, client):
client._metadata_refresh_in_progress = True
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(9999.999, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
def test_maybe_refresh_metadata_update(mocker, client):
@@ -358,23 +357,35 @@ def test_maybe_refresh_metadata_update(mocker, client):
mocker.patch.object(client, '_can_send_request', return_value=True)
send = mocker.patch.object(client, 'send')
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(0, sleep=True)
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # request_timeout_ms
assert client._metadata_refresh_in_progress
request = MetadataRequest[0]([])
- send.assert_called_with('foobar', request)
+ send.assert_called_once_with('foobar', request)
-def test_maybe_refresh_metadata_failure(mocker, client):
+def test_maybe_refresh_metadata_cant_send(mocker, client):
mocker.patch.object(client, 'least_loaded_node', return_value='foobar')
+ mocker.patch.object(client, '_can_connect', return_value=True)
+ mocker.patch.object(client, '_maybe_connect', return_value=True)
now = time.time()
t = mocker.patch('time.time')
t.return_value = now
- client.poll(timeout_ms=9999999, sleep=True)
- client._poll.assert_called_with(0, sleep=True)
- assert client._last_no_node_available_ms == now * 1000
+ # first poll attempts connection
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(2.222, sleep=True) # reconnect backoff
+ client._can_connect.assert_called_once_with('foobar')
+ client._maybe_connect.assert_called_once_with('foobar')
+
+ # poll while connecting should not attempt a new connection
+ client._connecting.add('foobar')
+ client._can_connect.reset_mock()
+ client.poll(timeout_ms=12345678, sleep=True)
+ client._poll.assert_called_with(9999.999, sleep=True) # connection timeout (request timeout)
+ assert not client._can_connect.called
+
assert not client._metadata_refresh_in_progress